Author: davsclaus Date: Wed Jun 13 09:25:16 2012 New Revision: 1349704 URL: http://svn.apache.org/viewvc?rev=1349704&view=rev Log: CAMEL-5225: Configured encoders and decoders must be shareable or implement ChannelHandlerFactory to be safely used with Netty.
Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java?rev=1349704&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java (added) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java Wed Jun 13 09:25:16 2012 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.nio.charset.Charset; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandler; +import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; +import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; +import org.jboss.netty.handler.codec.serialization.ClassResolvers; +import org.jboss.netty.handler.codec.serialization.ObjectDecoder; +import org.jboss.netty.handler.codec.serialization.ObjectEncoder; +import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.handler.codec.string.StringEncoder; + +/** + * Helper to create commonly used {@link ChannelHandlerFactory} instances. + */ +public final class ChannelHandlerFactories { + + private ChannelHandlerFactories() { + } + + public static ChannelHandlerFactory newStringEncoder(Charset charset) { + return new ShareableChannelHandlerFactory(new StringEncoder(charset)); + } + + public static ChannelHandlerFactory newStringDecoder(Charset charset) { + return new ShareableChannelHandlerFactory(new StringDecoder(charset)); + } + + public static ChannelHandlerFactory newObjectDecoder() { + return new ChannelHandlerFactory() { + @Override + public ChannelHandler newChannelHandler() { + return new ObjectDecoder(ClassResolvers.weakCachingResolver(null)); + } + }; + } + + public static ChannelHandlerFactory newObjectEncoder() { + return new ShareableChannelHandlerFactory(new ObjectEncoder()); + } + + public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final int maxFrameLength, final ChannelBuffer[] delimiters) { + return new ChannelHandlerFactory() { + @Override + public ChannelHandler newChannelHandler() { + return new DelimiterBasedFrameDecoder(maxFrameLength, true, delimiters); + } + }; + } + + public static ChannelHandlerFactory newLengthFieldBasedFrameDecoder(final int maxFrameLength, final int lengthFieldOffset, + final int lengthFieldLength, final int lengthAdjustment, + final int initialBytesToStrip) { + return new ChannelHandlerFactory() { + @Override + public ChannelHandler newChannelHandler() { + return new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); + } + }; + } + +} Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java?rev=1349704&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java (added) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactory.java Wed Jun 13 09:25:16 2012 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.jboss.netty.channel.ChannelHandler; + +/** + * Factory for creating new {@link ChannelHandler} used for non shareable + * encoders and decoders configured on the Camel {@link NettyComponent}. + * <p/> + * This is needed as Netty's {@link ChannelHandler} is often not shareable + * and therefore a new instance must be created when a handler is being + * added to a pipeline. + */ +public interface ChannelHandlerFactory extends ChannelHandler { + + /** + * Creates a new {@link ChannelHandler} to be used. + */ + ChannelHandler newChannelHandler(); +} Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1349704&r1=1349703&r2=1349704&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java Wed Jun 13 09:25:16 2012 @@ -22,9 +22,8 @@ import javax.net.ssl.SSLEngine; import org.apache.camel.component.netty.handlers.ClientChannelHandler; import org.apache.camel.component.netty.ssl.SSLEngineFactory; -import org.jboss.netty.channel.ChannelDownstreamHandler; +import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.ssl.SslHandler; import org.slf4j.Logger; @@ -46,25 +45,40 @@ public class DefaultClientPipelineFactor SslHandler sslHandler = configureClientSSLOnDemand(producer); if (sslHandler != null) { LOG.debug("Client SSL handler configured and added to the ChannelPipeline"); - channelPipeline.addLast("ssl", sslHandler); + addToPipeline("ssl", channelPipeline, sslHandler); } - List<ChannelUpstreamHandler> decoders = producer.getConfiguration().getDecoders(); + List<ChannelHandler> decoders = producer.getConfiguration().getDecoders(); for (int x = 0; x < decoders.size(); x++) { - channelPipeline.addLast("decoder-" + x, decoders.get(x)); + ChannelHandler decoder = decoders.get(x); + if (decoder instanceof ChannelHandlerFactory) { + // use the factory to create a new instance of the channel as it may not be shareable + decoder = ((ChannelHandlerFactory) decoder).newChannelHandler(); + } + addToPipeline("decoder-" + x, channelPipeline, decoder); } - List<ChannelDownstreamHandler> encoders = producer.getConfiguration().getEncoders(); + List<ChannelHandler> encoders = producer.getConfiguration().getEncoders(); for (int x = 0; x < encoders.size(); x++) { - channelPipeline.addLast("encoder-" + x, encoders.get(x)); + ChannelHandler encoder = encoders.get(x); + if (encoder instanceof ChannelHandlerFactory) { + // use the factory to create a new instance of the channel as it may not be shareable + encoder = ((ChannelHandlerFactory) encoder).newChannelHandler(); + } + addToPipeline("encoder-" + x, channelPipeline, encoder); } // our handler must be added last - channelPipeline.addLast("handler", new ClientChannelHandler(producer)); + addToPipeline("handler", channelPipeline, new ClientChannelHandler(producer)); + LOG.trace("Created ChannelPipeline: {}", channelPipeline); return channelPipeline; } + private void addToPipeline(String name, ChannelPipeline pipeline, ChannelHandler handler) { + pipeline.addLast(name, handler); + } + private SslHandler configureClientSSLOnDemand(NettyProducer producer) throws Exception { if (!producer.getConfiguration().isSsl()) { return null; Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java?rev=1349704&r1=1349703&r2=1349704&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java Wed Jun 13 09:25:16 2012 @@ -22,9 +22,8 @@ import javax.net.ssl.SSLEngine; import org.apache.camel.component.netty.handlers.ServerChannelHandler; import org.apache.camel.component.netty.ssl.SSLEngineFactory; -import org.jboss.netty.channel.ChannelDownstreamHandler; +import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.ssl.SslHandler; import org.slf4j.Logger; @@ -46,24 +45,40 @@ public class DefaultServerPipelineFactor SslHandler sslHandler = configureServerSSLOnDemand(consumer); if (sslHandler != null) { LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline"); - channelPipeline.addLast("ssl", sslHandler); + addToPipeline("ssl", channelPipeline, sslHandler); } - List<ChannelDownstreamHandler> encoders = consumer.getConfiguration().getEncoders(); + + List<ChannelHandler> encoders = consumer.getConfiguration().getEncoders(); for (int x = 0; x < encoders.size(); x++) { - channelPipeline.addLast("encoder-" + x, encoders.get(x)); + ChannelHandler encoder = encoders.get(x); + if (encoder instanceof ChannelHandlerFactory) { + // use the factory to create a new instance of the channel as it may not be shareable + encoder = ((ChannelHandlerFactory) encoder).newChannelHandler(); + } + addToPipeline("encoder-" + x, channelPipeline, encoder); } - List<ChannelUpstreamHandler> decoders = consumer.getConfiguration().getDecoders(); + List<ChannelHandler> decoders = consumer.getConfiguration().getDecoders(); for (int x = 0; x < decoders.size(); x++) { - channelPipeline.addLast("decoder-" + x, decoders.get(x)); + ChannelHandler decoder = decoders.get(x); + if (decoder instanceof ChannelHandlerFactory) { + // use the factory to create a new instance of the channel as it may not be shareable + decoder = ((ChannelHandlerFactory) decoder).newChannelHandler(); + } + addToPipeline("decoder-" + x, channelPipeline, decoder); } // our handler must be added last - channelPipeline.addLast("handler", new ServerChannelHandler(consumer)); + addToPipeline("handler", channelPipeline, new ServerChannelHandler(consumer)); + LOG.trace("Created ChannelPipeline: {}", channelPipeline); return channelPipeline; } - + + private void addToPipeline(String name, ChannelPipeline pipeline, ChannelHandler handler) { + pipeline.addLast(name, handler); + } + private SslHandler configureServerSSLOnDemand(NettyConsumer consumer) throws Exception { if (!consumer.getConfiguration().isSsl()) { return null; Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java?rev=1349704&r1=1349703&r2=1349704&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java Wed Jun 13 09:25:16 2012 @@ -48,6 +48,9 @@ public class NettyComponent extends Defa config.parseURI(new URI(remaining), parameters, this); + // validate config + config.validateConfiguration(); + NettyEndpoint nettyEndpoint = new NettyEndpoint(remaining, this, config); nettyEndpoint.setTimer(getTimer()); setProperties(nettyEndpoint.getConfiguration(), parameters); Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1349704&r1=1349703&r2=1349704&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Wed Jun 13 09:25:16 2012 @@ -26,16 +26,11 @@ import java.util.Map; import org.apache.camel.LoggingLevel; import org.apache.camel.RuntimeCamelException; import org.apache.camel.util.EndpointHelper; +import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.jsse.SSLContextParameters; -import org.jboss.netty.channel.ChannelDownstreamHandler; -import org.jboss.netty.channel.ChannelUpstreamHandler; -import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.handler.codec.frame.Delimiters; -import org.jboss.netty.handler.codec.serialization.ClassResolvers; -import org.jboss.netty.handler.codec.serialization.ObjectDecoder; -import org.jboss.netty.handler.codec.serialization.ObjectEncoder; -import org.jboss.netty.handler.codec.string.StringDecoder; -import org.jboss.netty.handler.codec.string.StringEncoder; import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.util.CharsetUtil; import org.slf4j.Logger; @@ -62,8 +57,8 @@ public class NettyConfiguration implemen private File keyStoreFile; private File trustStoreFile; private SslHandler sslHandler; - private List<ChannelDownstreamHandler> encoders = new ArrayList<ChannelDownstreamHandler>(); - private List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>(); + private List<ChannelHandler> encoders = new ArrayList<ChannelHandler>(); + private List<ChannelHandler> decoders = new ArrayList<ChannelHandler>(); private boolean ssl; private long sendBufferSize = 65536; private long receiveBufferSize = 65536; @@ -84,13 +79,14 @@ public class NettyConfiguration implemen /** * Returns a copy of this configuration */ + @SuppressWarnings("unchecked") public NettyConfiguration copy() { try { NettyConfiguration answer = (NettyConfiguration) clone(); // make sure the lists is copied in its own instance - List<ChannelDownstreamHandler> encodersCopy = new ArrayList<ChannelDownstreamHandler>(encoders); + List encodersCopy = new ArrayList(encoders); answer.setEncoders(encodersCopy); - List<ChannelUpstreamHandler> decodersCopy = new ArrayList<ChannelUpstreamHandler>(decoders); + List decodersCopy = new ArrayList(decoders); answer.setDecoders(decodersCopy); return answer; } catch (CloneNotSupportedException e) { @@ -98,6 +94,37 @@ public class NettyConfiguration implemen } } + public void validateConfiguration() { + // validate that the encoders is either shareable or is a handler factory + for (ChannelHandler encoder : encoders) { + if (encoder instanceof ChannelHandlerFactory) { + continue; + } + if (ObjectHelper.getAnnotation(encoder, ChannelHandler.Sharable.class) != null) { + continue; + } + LOG.warn("The encoder {} is not @Shareable or an ChannelHandlerFactory instance. The encoder cannot safely be used.", encoder); + } + + // validate that the decoders is either shareable or is a handler factory + for (ChannelHandler decoder : decoders) { + if (decoder instanceof ChannelHandlerFactory) { + continue; + } + if (ObjectHelper.getAnnotation(decoder, ChannelHandler.Sharable.class) != null) { + continue; + } + LOG.warn("The decoder {} is not @Shareable or an ChannelHandlerFactory instance. The decoder cannot safely be used.", decoder); + } + if (sslHandler != null) { + boolean factory = sslHandler instanceof ChannelHandlerFactory; + boolean shareable = ObjectHelper.getAnnotation(sslHandler, ChannelHandler.Sharable.class) != null; + if (!factory && !shareable) { + LOG.warn("The sslHandler {} is not @Shareable or an ChannelHandlerFactory instance. The sslHandler cannot safely be used.", sslHandler); + } + } + } + public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component) throws Exception { protocol = uri.getScheme(); @@ -118,10 +145,10 @@ public class NettyConfiguration implemen serverPipelineFactory = component.resolveAndRemoveReferenceParameter(parameters, "serverPipelineFactory", ServerPipelineFactory.class, null); // set custom encoders and decoders first - List<ChannelDownstreamHandler> referencedEncoders = component.resolveAndRemoveReferenceListParameter(parameters, "encoders", ChannelDownstreamHandler.class, null); - addToHandlersList(encoders, referencedEncoders, ChannelDownstreamHandler.class); - List<ChannelUpstreamHandler> referencedDecoders = component.resolveAndRemoveReferenceListParameter(parameters, "decoders", ChannelUpstreamHandler.class, null); - addToHandlersList(decoders, referencedDecoders, ChannelUpstreamHandler.class); + List<ChannelHandler> referencedEncoders = component.resolveAndRemoveReferenceListParameter(parameters, "encoders", ChannelHandler.class, null); + addToHandlersList(encoders, referencedEncoders, ChannelHandler.class); + List<ChannelHandler> referencedDecoders = component.resolveAndRemoveReferenceListParameter(parameters, "decoders", ChannelHandler.class, null); + addToHandlersList(decoders, referencedDecoders, ChannelHandler.class); // then set parameters with the help of the camel context type converters EndpointHelper.setReferenceProperties(component.getCamelContext(), this, parameters); @@ -133,9 +160,10 @@ public class NettyConfiguration implemen // are we textline or object? if (isTextline()) { Charset charset = getEncoding() != null ? Charset.forName(getEncoding()) : CharsetUtil.UTF_8; - encoders.add(new StringEncoder(charset)); - decoders.add(new DelimiterBasedFrameDecoder(decoderMaxLineLength, true, delimiter == TextLineDelimiter.LINE ? Delimiters.lineDelimiter() : Delimiters.nulDelimiter())); - decoders.add(new StringDecoder(charset)); + encoders.add(ChannelHandlerFactories.newStringEncoder(charset)); + ChannelBuffer[] delimiters = delimiter == TextLineDelimiter.LINE ? Delimiters.lineDelimiter() : Delimiters.nulDelimiter(); + decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength, delimiters)); + decoders.add(ChannelHandlerFactories.newStringDecoder(charset)); if (LOG.isDebugEnabled()) { LOG.debug("Using textline encoders and decoders with charset: {}, delimiter: {} and decoderMaxLineLength: {}", @@ -143,8 +171,8 @@ public class NettyConfiguration implemen } } else { // object serializable is then used - encoders.add(new ObjectEncoder()); - decoders.add(new ObjectDecoder(ClassResolvers.weakCachingResolver(null))); + encoders.add(ChannelHandlerFactories.newObjectEncoder()); + decoders.add(ChannelHandlerFactories.newObjectDecoder()); LOG.debug("Using object encoders and decoders"); } @@ -291,42 +319,42 @@ public class NettyConfiguration implemen this.sslHandler = sslHandler; } - public List<ChannelDownstreamHandler> getEncoders() { + public List<ChannelHandler> getDecoders() { + return decoders; + } + + public void setDecoders(List<ChannelHandler> decoders) { + this.decoders = decoders; + } + + public List<ChannelHandler> getEncoders() { return encoders; } - public List<ChannelUpstreamHandler> getDecoders() { - return decoders; + public void setEncoders(List<ChannelHandler> encoders) { + this.encoders = encoders; } - public ChannelDownstreamHandler getEncoder() { + public ChannelHandler getEncoder() { return encoders.isEmpty() ? null : encoders.get(0); } - public void setEncoder(ChannelDownstreamHandler encoder) { + public void setEncoder(ChannelHandler encoder) { if (!encoders.contains(encoder)) { encoders.add(encoder); } } - public void setEncoders(List<ChannelDownstreamHandler> encoders) { - this.encoders = encoders; - } - - public ChannelUpstreamHandler getDecoder() { + public ChannelHandler getDecoder() { return decoders.isEmpty() ? null : decoders.get(0); } - public void setDecoder(ChannelUpstreamHandler decoder) { + public void setDecoder(ChannelHandler decoder) { if (!decoders.contains(decoder)) { decoders.add(decoder); } } - public void setDecoders(List<ChannelUpstreamHandler> decoders) { - this.decoders = decoders; - } - public long getSendBufferSize() { return sendBufferSize; } @@ -451,17 +479,6 @@ public class NettyConfiguration implemen return host + ":" + port; } - private <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) { - if (handlers != null) { - for (int x = 0; x < handlers.size(); x++) { - T handler = handlers.get(x); - if (handlerType.isInstance(handler)) { - configured.add(handler); - } - } - } - } - public void setClientPipelineFactory(ClientPipelineFactory clientPipelineFactory) { this.clientPipelineFactory = clientPipelineFactory; } @@ -477,7 +494,7 @@ public class NettyConfiguration implemen public ServerPipelineFactory getServerPipelineFactory() { return serverPipelineFactory; } - + public int getWorkerCount() { return workerCount; } @@ -493,4 +510,15 @@ public class NettyConfiguration implemen public void setSslContextParameters(SSLContextParameters sslContextParameters) { this.sslContextParameters = sslContextParameters; } + + private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) { + if (handlers != null) { + for (int x = 0; x < handlers.size(); x++) { + T handler = handlers.get(x); + if (handlerType.isInstance(handler)) { + configured.add(handler); + } + } + } + } } Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java?rev=1349704&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java (added) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ShareableChannelHandlerFactory.java Wed Jun 13 09:25:16 2012 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import org.jboss.netty.channel.ChannelHandler; + +/** + * A {@link ChannelHandlerFactory} returning a shareable {@link ChannelHandler}. + */ +public class ShareableChannelHandlerFactory implements ChannelHandlerFactory { + + private final ChannelHandler channelHandler; + + public ShareableChannelHandlerFactory(ChannelHandler channelHandler) { + this.channelHandler = channelHandler; + } + + @Override + public ChannelHandler newChannelHandler() { + return channelHandler; + } +} Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java?rev=1349704&r1=1349703&r2=1349704&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java Wed Jun 13 09:25:16 2012 @@ -24,6 +24,7 @@ import org.apache.camel.builder.RouteBui import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.JndiRegistry; import org.jboss.netty.channel.ChannelDownstreamHandler; +import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; @@ -38,7 +39,8 @@ public class MultipleCodecsTest extends JndiRegistry registry = super.createRegistry(); // START SNIPPET: registry-beans - LengthFieldBasedFrameDecoder lengthDecoder = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4); + ChannelHandlerFactory lengthDecoder = ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4); + StringDecoder stringDecoder = new StringDecoder(); registry.bind("length-decoder", lengthDecoder); registry.bind("string-decoder", stringDecoder); @@ -48,11 +50,11 @@ public class MultipleCodecsTest extends registry.bind("length-encoder", lengthEncoder); registry.bind("string-encoder", stringEncoder); - List<ChannelUpstreamHandler> decoders = new ArrayList<ChannelUpstreamHandler>(); + List<ChannelHandler> decoders = new ArrayList<ChannelHandler>(); decoders.add(lengthDecoder); decoders.add(stringDecoder); - List<ChannelDownstreamHandler> encoders = new ArrayList<ChannelDownstreamHandler>(); + List<ChannelHandler> encoders = new ArrayList<ChannelHandler>(); encoders.add(lengthEncoder); encoders.add(stringEncoder); Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java?rev=1349704&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflicts2Test.java Wed Jun 13 09:25:16 2012 @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.Arrays; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; +import org.junit.Test; + +/** + * + */ +public class UnsharableCodecsConflicts2Test extends BaseNettyTest { + + static final byte[] LENGTH_HEADER = {0x00, 0x00, 0x40, 0x00}; // 16384 bytes + + private Processor processor = new P(); + private int port; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + // create a single decoder + ChannelHandlerFactory decoder = ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4); + registry.bind("length-decoder", decoder); + + return registry; + } + + @Test + public void unsharableCodecsConflictsTest() throws Exception { + byte[] data1 = new byte[8192]; + byte[] data2 = new byte[16383]; + Arrays.fill(data1, (byte) 0x38); + Arrays.fill(data2, (byte) 0x39); + byte[] body1 = (new String(LENGTH_HEADER) + new String(data1)).getBytes(); + byte[] body2 = (new String(LENGTH_HEADER) + new String(data2)).getBytes(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived(new String(data2) + "9"); + + Socket client1 = getSocket("localhost", port); + Socket client2 = getSocket("localhost", port); + + // use two clients to send to the same server at the same time + try { + sendBuffer(body2, client2); + sendBuffer(body1, client1); + sendBuffer(new String("9").getBytes(), client2); + } catch (Exception e) { + log.error("", e); + } finally { + client1.close(); + client2.close(); + } + + mock.assertIsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + port = getPort(); + + from("netty:tcp://localhost:{{port}}?decoder=#length-decoder&sync=false") + .process(processor) + .to("mock:result"); + } + }; + } + + private static Socket getSocket(String host, int port) throws IOException { + Socket s = new Socket(host, port); + s.setSoTimeout(60000); + return s; + } + + public static void sendBuffer(byte[] buf, Socket server) throws Exception { + OutputStream netOut = server.getOutputStream(); + OutputStream dataOut = new BufferedOutputStream(netOut); + try { + dataOut.write(buf, 0, buf.length); + dataOut.flush(); + } catch (Exception e) { + server.close(); + throw e; + } + } + + class P implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getOut().setBody( + new String(((BigEndianHeapChannelBuffer) exchange.getIn() + .getBody()).array())); + } + } +} Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java?rev=1349704&r1=1349703&r2=1349704&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java (original) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java Wed Jun 13 09:25:16 2012 @@ -28,7 +28,6 @@ import org.apache.camel.component.mock.M import org.apache.camel.impl.JndiRegistry; import org.apache.camel.util.IOHelper; import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; -import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; import org.junit.Test; /** @@ -47,12 +46,11 @@ public class UnsharableCodecsConflictsTe protected JndiRegistry createRegistry() throws Exception { JndiRegistry registry = super.createRegistry(); - // the decoders cannot be shared with multiple netty consumers, so we need one for each consumer - LengthFieldBasedFrameDecoder lengthDecoder = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4); - registry.bind("length-decoder", lengthDecoder); - - LengthFieldBasedFrameDecoder lengthDecoder2 = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4); - registry.bind("length-decoder2", lengthDecoder2); + // we can share the decoder between multiple netty consumers, because they have the same configuration + // and we use a ChannelHandlerFactory + ChannelHandlerFactory decoder = ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4); + registry.bind("length-decoder", decoder); + registry.bind("length-decoder2", decoder); return registry; } Modified: camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml?rev=1349704&r1=1349703&r2=1349704&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml (original) +++ camel/trunk/components/camel-netty/src/test/resources/org/apache/camel/component/netty/multiple-codecs.xml Wed Jun 13 09:25:16 2012 @@ -37,7 +37,7 @@ <!-- START SNIPPET: registry-beans --> <util:list id="decoders" list-class="java.util.LinkedList"> - <bean class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder"> + <bean class="org.apache.camel.component.netty.ChannelHandlerFactories" factory-method="newLengthFieldBasedFrameDecoder"> <constructor-arg value="1048576"/> <constructor-arg value="0"/> <constructor-arg value="4"/> @@ -59,7 +59,7 @@ </bean> <bean id="string-encoder" class="org.jboss.netty.handler.codec.string.StringEncoder"/> - <bean id="length-decoder" class="org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder"> + <bean id="length-decoder" class="org.apache.camel.component.netty.ChannelHandlerFactories" factory-method="newLengthFieldBasedFrameDecoder"> <constructor-arg value="1048576"/> <constructor-arg value="0"/> <constructor-arg value="4"/>