CAMEL-6555 Fixed the compile errors in camel-netty4

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fa1cdc43
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fa1cdc43
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fa1cdc43

Branch: refs/heads/master
Commit: fa1cdc43f742e7f1f8ed442907dbff2cd9d2d264
Parents: 66ccde6
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Fri Jul 18 23:14:04 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Tue Jul 22 21:25:18 2014 +0800

----------------------------------------------------------------------
 .../netty4/DefaultServerPipelineFactory.java    |  5 +-
 .../SingleTCPNettyServerBootstrapFactory.java   |  7 ++-
 .../netty4/handlers/ClientChannelHandler.java   |  7 +--
 .../netty4/handlers/ServerChannelHandler.java   |  2 +-
 .../camel/component/netty4/MyCustomCodec.java   | 49 +++++++++++---------
 5 files changed, 39 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
index cf9e57e..92c1b21 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
@@ -16,19 +16,18 @@
  */
 package org.apache.camel.component.netty4;
 
-import java.nio.channels.Channels;
 import java.util.List;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.util.ObjectHelper;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.EventExecutorGroup;
+
+import org.apache.camel.CamelContext;
 import org.apache.camel.component.netty4.handlers.ServerChannelHandler;
 import org.apache.camel.component.netty4.ssl.SSLEngineFactory;
 import org.apache.camel.util.ObjectHelper;

http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
index 6df8b06..dbc668c 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
@@ -108,7 +108,9 @@ public class SingleTCPNettyServerBootstrapFactory extends 
ServiceSupport impleme
             if (!future.isSuccess()) {
                 // if we cannot bind, the re-create channel
                 allChannels.remove(channel);
-                channel = serverBootstrap.bind(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
+                future = serverBootstrap.bind(new 
InetSocketAddress(configuration.getHost(), configuration.getPort()));
+                future.awaitUninterruptibly();
+                channel = future.channel();
                 allChannels.add(channel);
             }
         }
@@ -118,7 +120,8 @@ public class SingleTCPNettyServerBootstrapFactory extends 
ServiceSupport impleme
     protected void doSuspend() throws Exception {
         if (channel != null) {
             LOG.debug("ServerBootstrap unbinding from {}:{}", 
configuration.getHost(), configuration.getPort());
-            ChannelFuture future = channel.unbind();
+            //TODO need to check if it's good way to unbinding the channel
+            ChannelFuture future = channel.close();
             future.awaitUninterruptibly();
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index c975afd..d4651a9 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.netty4.handlers;
 
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.camel.AsyncCallback;
@@ -122,14 +123,14 @@ public class ClientChannelHandler extends 
SimpleChannelInboundHandler<Object> {
         messageReceived = true;
 
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Message received: {}", messageEvent);
+            LOG.trace("Message received: {}", msg);
         }
 
         if (producer.getConfiguration().getRequestTimeout() > 0) {
-            ChannelHandler handler = ctx.getPipeline().get("timeout");
+            ChannelHandler handler = ctx.pipeline().get("timeout");
             if (handler != null) {
                 LOG.trace("Removing timeout channel as we received message");
-                ctx.getPipeline().remove(handler);
+                ctx.pipeline().remove(handler);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
index d800b5d..47dce7b 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
@@ -96,7 +96,7 @@ public class ServerChannelHandler extends 
SimpleChannelInboundHandler<Object> {
         // we want to handle the UoW
         consumer.createUoW(exchange);
 
-        beforeProcess(exchange, messageEvent);
+        beforeProcess(exchange, ctx, msg);
 
         // process accordingly to endpoint configuration
         if (consumer.getEndpoint().isSynchronous()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/fa1cdc43/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
index ee8ba36..08a9ebc 100644
--- 
a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
+++ 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
@@ -16,26 +16,26 @@
  */
 package org.apache.camel.component.netty4;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import java.util.List;
 
-import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.EmptyByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.MessageToMessageEncoder;
 
 public final class MyCustomCodec {
     
-    private static ChannelBuffer nullDelimiter = 
ChannelBuffers.wrappedBuffer(new byte[]{0});
+    private static ByteBuf nullDelimiter = new 
EmptyByteBuf(ByteBufAllocator.DEFAULT);
     
     private MyCustomCodec() {
         // Helper class
     }
 
     public static ChannelHandlerFactory createMyCustomDecoder() {
-        ChannelBuffer[] delimiters = new ChannelBuffer[]{nullDelimiter, 
nullDelimiter};
+        ByteBuf[] delimiters = new ByteBuf[]{nullDelimiter, nullDelimiter};
         return ChannelHandlerFactories.newDelimiterBasedFrameDecoder(4096, 
delimiters);
     }
 
@@ -48,34 +48,39 @@ public final class MyCustomCodec {
     }
 
     @ChannelHandler.Sharable
-    public static class BytesDecoder extends OneToOneDecoder {
+    public static class BytesDecoder extends MessageToMessageDecoder<Object> {
 
         @Override
-        protected Object decode(ChannelHandlerContext ctx, Channel channel, 
Object msg) throws Exception {
-            if (!(msg instanceof ChannelBuffer)) {
-                return msg;
+        protected void decode(ChannelHandlerContext ctx, Object msg, 
List<Object> out) throws Exception {
+            if (!(msg instanceof ByteBuf)) {
+                out.add(msg);
             } else {
                 // it may be empty, then return null
-                ChannelBuffer cb = (ChannelBuffer) msg;
-                if (cb.hasArray() && cb.readable()) {
-                    return cb.array();
+                ByteBuf cb = (ByteBuf) msg;
+                if (cb.hasArray() && cb.isReadable()) {
+                    out.add(cb.array());
                 } else {
-                    return null;
+                    out.add((Object)null);
                 }
             }
+            
         }
 
     }
 
     @ChannelHandler.Sharable
-    public static class BytesEncoder extends OneToOneEncoder {
+    public static class BytesEncoder extends MessageToMessageEncoder<Object> {
 
         @Override
-        protected Object encode(ChannelHandlerContext ctx, Channel channel, 
Object msg) throws Exception {
+        protected void encode(ChannelHandlerContext ctx, Object msg, 
List<Object> out) throws Exception {
             if (msg instanceof byte[]) {
-                return copiedBuffer((byte[]) msg);
+                byte[] bytes = (byte[])msg;
+                ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(bytes.length);
+                buf.writeBytes(bytes);
+                out.add(buf);
+            } else {
+                out.add(msg);
             }
-            return msg;
         }
     }
 }

Reply via email to