CAMEL-6555 Fixed the compile errors in camel-netty4
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fa1cdc43 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fa1cdc43 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fa1cdc43 Branch: refs/heads/master Commit: fa1cdc43f742e7f1f8ed442907dbff2cd9d2d264 Parents: 66ccde6 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Fri Jul 18 23:14:04 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue Jul 22 21:25:18 2014 +0800 ---------------------------------------------------------------------- .../netty4/DefaultServerPipelineFactory.java | 5 +- .../SingleTCPNettyServerBootstrapFactory.java | 7 ++- .../netty4/handlers/ClientChannelHandler.java | 7 +-- .../netty4/handlers/ServerChannelHandler.java | 2 +- .../camel/component/netty4/MyCustomCodec.java | 49 +++++++++++--------- 5 files changed, 39 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java index cf9e57e..92c1b21 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java @@ -16,19 +16,18 @@ */ package org.apache.camel.component.netty4; -import java.nio.channels.Channels; import java.util.List; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import org.apache.camel.CamelContext; -import org.apache.camel.util.ObjectHelper; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelPipeline; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.EventExecutorGroup; + +import org.apache.camel.CamelContext; import org.apache.camel.component.netty4.handlers.ServerChannelHandler; import org.apache.camel.component.netty4.ssl.SSLEngineFactory; import org.apache.camel.util.ObjectHelper; http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java index 6df8b06..dbc668c 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java @@ -108,7 +108,9 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme if (!future.isSuccess()) { // if we cannot bind, the re-create channel allChannels.remove(channel); - channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + future = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())); + future.awaitUninterruptibly(); + channel = future.channel(); allChannels.add(channel); } } @@ -118,7 +120,8 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme protected void doSuspend() throws Exception { if (channel != null) { LOG.debug("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort()); - ChannelFuture future = channel.unbind(); + //TODO need to check if it's good way to unbinding the channel + ChannelFuture future = channel.close(); future.awaitUninterruptibly(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java index c975afd..d4651a9 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.netty4.handlers; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.apache.camel.AsyncCallback; @@ -122,14 +123,14 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { messageReceived = true; if (LOG.isTraceEnabled()) { - LOG.trace("Message received: {}", messageEvent); + LOG.trace("Message received: {}", msg); } if (producer.getConfiguration().getRequestTimeout() > 0) { - ChannelHandler handler = ctx.getPipeline().get("timeout"); + ChannelHandler handler = ctx.pipeline().get("timeout"); if (handler != null) { LOG.trace("Removing timeout channel as we received message"); - ctx.getPipeline().remove(handler); + ctx.pipeline().remove(handler); } } http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java index d800b5d..47dce7b 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java @@ -96,7 +96,7 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { // we want to handle the UoW consumer.createUoW(exchange); - beforeProcess(exchange, messageEvent); + beforeProcess(exchange, ctx, msg); // process accordingly to endpoint configuration if (consumer.getEndpoint().isSynchronous()) { http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java index ee8ba36..08a9ebc 100644 --- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java @@ -16,26 +16,26 @@ */ package org.apache.camel.component.netty4; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandler; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +import java.util.List; -import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.EmptyByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.MessageToMessageEncoder; public final class MyCustomCodec { - private static ChannelBuffer nullDelimiter = ChannelBuffers.wrappedBuffer(new byte[]{0}); + private static ByteBuf nullDelimiter = new EmptyByteBuf(ByteBufAllocator.DEFAULT); private MyCustomCodec() { // Helper class } public static ChannelHandlerFactory createMyCustomDecoder() { - ChannelBuffer[] delimiters = new ChannelBuffer[]{nullDelimiter, nullDelimiter}; + ByteBuf[] delimiters = new ByteBuf[]{nullDelimiter, nullDelimiter}; return ChannelHandlerFactories.newDelimiterBasedFrameDecoder(4096, delimiters); } @@ -48,34 +48,39 @@ public final class MyCustomCodec { } @ChannelHandler.Sharable - public static class BytesDecoder extends OneToOneDecoder { + public static class BytesDecoder extends MessageToMessageDecoder<Object> { @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { - if (!(msg instanceof ChannelBuffer)) { - return msg; + protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { + if (!(msg instanceof ByteBuf)) { + out.add(msg); } else { // it may be empty, then return null - ChannelBuffer cb = (ChannelBuffer) msg; - if (cb.hasArray() && cb.readable()) { - return cb.array(); + ByteBuf cb = (ByteBuf) msg; + if (cb.hasArray() && cb.isReadable()) { + out.add(cb.array()); } else { - return null; + out.add((Object)null); } } + } } @ChannelHandler.Sharable - public static class BytesEncoder extends OneToOneEncoder { + public static class BytesEncoder extends MessageToMessageEncoder<Object> { @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { + protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if (msg instanceof byte[]) { - return copiedBuffer((byte[]) msg); + byte[] bytes = (byte[])msg; + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(bytes.length); + buf.writeBytes(bytes); + out.add(buf); + } else { + out.add(msg); } - return msg; } } }