http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTransferExchangeOptionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTransferExchangeOptionTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTransferExchangeOptionTest.java new file mode 100644 index 0000000..3127b98 --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyTransferExchangeOptionTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import java.nio.charset.Charset; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +/** + * @version + */ +public class NettyTransferExchangeOptionTest extends BaseNettyTest { + + @Test + public void testNettyTransferExchangeOptionWithoutException() throws Exception { + Exchange exchange = sendExchange(false); + assertExchange(exchange, false); + } + + @Test + public void testNettyTransferExchangeOptionWithException() throws Exception { + Exchange exchange = sendExchange(true); + assertExchange(exchange, true); + } + + private Exchange sendExchange(boolean setException) throws Exception { + Endpoint endpoint = context.getEndpoint("netty4:tcp://localhost:{{port}}?transferExchange=true"); + Exchange exchange = endpoint.createExchange(); + + Message message = exchange.getIn(); + message.setBody("Hello!"); + message.setHeader("cheese", "feta"); + exchange.setProperty("ham", "old"); + exchange.setProperty("setException", setException); + + Producer producer = endpoint.createProducer(); + producer.start(); + + // ensure to stop producer after usage + try { + producer.process(exchange); + } finally { + producer.stop(); + } + + return exchange; + } + + private void assertExchange(Exchange exchange, boolean hasFault) { + if (!hasFault) { + Message out = exchange.getOut(); + assertNotNull(out); + assertFalse(out.isFault()); + assertEquals("Goodbye!", out.getBody()); + assertEquals("cheddar", out.getHeader("cheese")); + } else { + Message fault = exchange.getOut(); + assertNotNull(fault); + assertTrue(fault.isFault()); + assertNotNull(fault.getBody()); + assertTrue("Should get the InterrupteException exception", fault.getBody() instanceof InterruptedException); + assertEquals("nihao", fault.getHeader("hello")); + } + + + // in should stay the same + Message in = exchange.getIn(); + assertNotNull(in); + assertEquals("Hello!", in.getBody()); + assertEquals("feta", in.getHeader("cheese")); + // however the shared properties have changed + assertEquals("fresh", exchange.getProperty("salami")); + assertNull(exchange.getProperty("Charset")); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("netty4:tcp://localhost:{{port}}?transferExchange=true").process(new Processor() { + public void process(Exchange e) throws InterruptedException { + assertNotNull(e.getIn().getBody()); + assertNotNull(e.getIn().getHeaders()); + assertNotNull(e.getProperties()); + assertEquals("Hello!", e.getIn().getBody()); + assertEquals("feta", e.getIn().getHeader("cheese")); + assertEquals("old", e.getProperty("ham")); + assertEquals(ExchangePattern.InOut, e.getPattern()); + Boolean setException = (Boolean) e.getProperty("setException"); + + if (setException) { + e.getOut().setFault(true); + e.getOut().setBody(new InterruptedException()); + e.getOut().setHeader("hello", "nihao"); + } else { + e.getOut().setBody("Goodbye!"); + e.getOut().setHeader("cheese", "cheddar"); + } + e.setProperty("salami", "fresh"); + e.setProperty("Charset", Charset.defaultCharset()); + } + }); + } + }; + } +} + +
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPAsyncTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPAsyncTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPAsyncTest.java new file mode 100644 index 0000000..1124e23 --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPAsyncTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import java.io.FileInputStream; +import java.io.InputStream; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.converter.IOConverter; +import org.apache.camel.util.IOHelper; +import org.junit.Test; + +public class NettyUDPAsyncTest extends BaseNettyTest { + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + private void sendFile(String uri) throws Exception { + template.send(uri, new Processor() { + public void process(Exchange exchange) throws Exception { + // Read from an input stream + InputStream is = IOHelper.buffered(new FileInputStream("src/test/resources/test.txt")); + + byte buffer[] = IOConverter.toBytes(is); + is.close(); + + // Set the property of the charset encoding + exchange.setProperty(Exchange.CHARSET_NAME, "UTF-8"); + Message in = exchange.getIn(); + in.setBody(buffer); + } + }); + } + + @Test + public void testUDPInOnlyWithNettyConsumer() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + sendFile("netty4:udp://localhost:{{port}}?sync=false"); + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty4:udp://localhost:{{port}}?sync=false") + .to("mock:result") + .to("log:Message"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPLargeMessageInOnlyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPLargeMessageInOnlyTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPLargeMessageInOnlyTest.java new file mode 100644 index 0000000..a213469 --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPLargeMessageInOnlyTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class NettyUDPLargeMessageInOnlyTest extends BaseNettyTest { + + private byte[] getMessageBytes(int messageSize) { + byte[] msgBytes = new byte[messageSize]; + for (int i = 0; i < messageSize; i++) { + msgBytes[i] = 'A'; + } + return msgBytes; + } + + private void sendMessage(int messageSize) throws Exception { + byte[] msgBytes = getMessageBytes(messageSize); + + assertEquals(msgBytes.length, messageSize); + String message = new String(msgBytes); + + getMockEndpoint("mock:result").expectedBodiesReceived(message); + template.sendBody("netty4:udp://localhost:{{port}}?sync=false", message); + assertMockEndpointsSatisfied(); + } + + @Test + public void testSend512Message() throws Exception { + sendMessage(512); + } + + @Test + public void testSend768Message() throws Exception { + sendMessage(768); + } + + @Test + public void testSend1024Message() throws Exception { + sendMessage(1024); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty4:udp://localhost:{{port}}?receiveBufferSizePredictor=2048&sync=false") + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPObjectSyncTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPObjectSyncTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPObjectSyncTest.java new file mode 100644 index 0000000..5d2111d --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPObjectSyncTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class NettyUDPObjectSyncTest extends BaseNettyTest { + + @Test + public void testUDPObjectInOutWithNettyConsumer() throws Exception { + Poetry poetry = new Poetry(); + Poetry response = template.requestBody("netty4:udp://localhost:{{port}}?sync=true", poetry, Poetry.class); + assertEquals("Dr. Sarojini Naidu", response.getPoet()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty4:udp://localhost:{{port}}?sync=true") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + Poetry poetry = (Poetry) exchange.getIn().getBody(); + poetry.setPoet("Dr. Sarojini Naidu"); + exchange.getOut().setBody(poetry); + } + }); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPSyncTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPSyncTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPSyncTest.java new file mode 100644 index 0000000..b9ad248 --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUDPSyncTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class NettyUDPSyncTest extends BaseNettyTest { + + @Test + public void testUDPStringInOutWithNettyConsumer() throws Exception { + for (int i = 0; i < 5; i++) { + String response = template.requestBody( + "netty4:udp://localhost:{{port}}?sync=true", + "After the Battle of Thermopylae in 480 BC - Simonides of Ceos (c. 556 BC-468 BC), Greek lyric poet wrote ?", String.class); + assertEquals("Go tell the Spartans, thou that passest by, That faithful to their precepts here we lie.", response); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("netty4:udp://localhost:{{port}}?sync=true") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getOut().setBody("Go tell the Spartans, thou that passest by, That faithful to their precepts here we lie."); + } + }); + } + }; + } + + +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpWithInOutUsingPlainSocketTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpWithInOutUsingPlainSocketTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpWithInOutUsingPlainSocketTest.java new file mode 100644 index 0000000..7fafbd7 --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpWithInOutUsingPlainSocketTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @version + */ +public class NettyUdpWithInOutUsingPlainSocketTest extends BaseNettyTest { + private static final Logger LOG = LoggerFactory.getLogger(NettyUdpWithInOutUsingPlainSocketTest.class); + + @Test + public void testSendAndReceiveOnce() throws Exception { + String out = sendAndReceiveUdpMessages("World"); + assertNotNull("should receive data", out); + assertEquals("Hello World\n", out); + } + + private String sendAndReceiveUdpMessages(String input) throws Exception { + DatagramSocket socket = new DatagramSocket(); + InetAddress address = InetAddress.getByName("127.0.0.1"); + + // must append delimiter + byte[] data = (input + "\n").getBytes(); + + DatagramPacket packet = new DatagramPacket(data, data.length, address, getPort()); + LOG.debug("+++ Sending data +++"); + socket.send(packet); + + Thread.sleep(1000); + + byte[] buf = new byte[128]; + DatagramPacket receive = new DatagramPacket(buf, buf.length, address, getPort()); + LOG.debug("+++ Receiving data +++"); + socket.receive(receive); + + socket.close(); + + return new String(receive.getData(), 0, receive.getLength()); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("netty4:udp://127.0.0.1:{{port}}?textline=true&sync=true").process(new Processor() { + public void process(Exchange exchange) throws Exception { + String s = exchange.getIn().getBody(String.class); + LOG.debug("Server got: " + s); + exchange.getOut().setBody("Hello " + s); + } + }); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUseSharedWorkerThreadPoolManyRoutesTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUseSharedWorkerThreadPoolManyRoutesTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUseSharedWorkerThreadPoolManyRoutesTest.java new file mode 100644 index 0000000..651e9de --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUseSharedWorkerThreadPoolManyRoutesTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.jboss.netty.channel.socket.nio.BossPool; +import org.jboss.netty.channel.socket.nio.WorkerPool; +import org.junit.Test; + +/** + * @version + */ +public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest { + + private JndiRegistry jndi; + private BossPool sharedBoos; + private WorkerPool sharedWorker; + private int before; + + @Override + protected boolean useJmx() { + return true; + } + + @Override + public void setUp() throws Exception { + before = Thread.activeCount(); + super.setUp(); + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + jndi = super.createRegistry(); + return jndi; + } + + @Test + public void testSharedThreadPool() throws Exception { + int delta = Thread.activeCount() - before; + + log.info("Created threads {}", delta); + assertTrue("There should not be created so many threads: " + delta, delta < 50); + + sharedWorker.shutdown(); + sharedBoos.shutdown(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + sharedWorker = new NettyWorkerPoolBuilder().withWorkerCount(10).build(); + jndi.bind("sharedWorker", sharedWorker); + sharedBoos = new NettyServerBossPoolBuilder().withBossCount(20).build(); + jndi.bind("sharedBoss", sharedBoos); + + for (int i = 0; i < 100; i++) { + from("netty4:tcp://localhost:" + getNextPort() + "?textline=true&sync=true&orderedThreadPoolExecutor=false" + + "&bossPool=#sharedBoss&workerPool=#sharedWorker") + .validate(body().isInstanceOf(String.class)) + .to("log:result") + .to("mock:result") + .transform(body().regexReplaceAll("Hello", "Bye")); + } + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUseSharedWorkerThreadPoolTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUseSharedWorkerThreadPoolTest.java new file mode 100644 index 0000000..7a0b273 --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUseSharedWorkerThreadPoolTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.jboss.netty.channel.socket.nio.WorkerPool; +import org.junit.Test; + +/** + * @version + */ +public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest { + + private JndiRegistry jndi; + private WorkerPool sharedServer; + private WorkerPool sharedClient; + private int port; + private int port2; + private int port3; + + @Override + protected boolean useJmx() { + return true; + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + jndi = super.createRegistry(); + return jndi; + } + + @Test + public void testSharedThreadPool() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(30); + + for (int i = 0; i < 10; i++) { + String reply = template.requestBody("netty4:tcp://localhost:" + port + "?textline=true&sync=true&workerPool=#sharedClientPool", "Hello World", String.class); + assertEquals("Bye World", reply); + + reply = template.requestBody("netty4:tcp://localhost:" + port2 + "?textline=true&sync=true&workerPool=#sharedClientPool", "Hello Camel", String.class); + assertEquals("Hi Camel", reply); + + reply = template.requestBody("netty4:tcp://localhost:" + port3 + "?textline=true&sync=true&workerPool=#sharedClientPool", "Hello Claus", String.class); + assertEquals("Hej Claus", reply); + } + + assertMockEndpointsSatisfied(); + + sharedServer.shutdown(); + sharedClient.shutdown(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // we have 3 routes, but lets try to have only 2 threads in the pool + sharedServer = new NettyWorkerPoolBuilder().withWorkerCount(2).withName("NettyServer").build(); + jndi.bind("sharedServerPool", sharedServer); + sharedClient = new NettyWorkerPoolBuilder().withWorkerCount(3).withName("NettyClient").build(); + jndi.bind("sharedClientPool", sharedClient); + + port = getPort(); + port2 = getNextPort(); + port3 = getNextPort(); + + from("netty4:tcp://localhost:" + port + "?textline=true&sync=true&workerPool=#sharedServerPool&orderedThreadPoolExecutor=false") + .validate(body().isInstanceOf(String.class)) + .to("log:result") + .to("mock:result") + .transform(body().regexReplaceAll("Hello", "Bye")); + + from("netty4:tcp://localhost:" + port2 + "?textline=true&sync=true&workerPool=#sharedServerPool&orderedThreadPoolExecutor=false") + .validate(body().isInstanceOf(String.class)) + .to("log:result") + .to("mock:result") + .transform(body().regexReplaceAll("Hello", "Hi")); + + from("netty4:tcp://localhost:" + port3 + "?textline=true&sync=true&workerPool=#sharedServerPool&orderedThreadPoolExecutor=false") + .validate(body().isInstanceOf(String.class)) + .to("log:result") + .to("mock:result") + .transform(body().regexReplaceAll("Hello", "Hej")); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/Poetry.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/Poetry.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/Poetry.java new file mode 100644 index 0000000..0781e71 --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/Poetry.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.netty4; + +import java.io.Serializable; + +public class Poetry implements Serializable { + private static final long serialVersionUID = 1L; + private String poet = "?"; + private String poem = "ONCE in the dream of a night I stood\n" + + "Lone in the light of a magical wood,\n" + + "Soul-deep in visions that poppy-like sprang;\n" + + "And spirits of Truth were the birds that sang,\n" + + "And spirits of Love were the stars that glowed,\n" + + "And spirits of Peace were the streams that flowed\n" + + "In that magical wood in the land of sleep." + + "\n" + + "Lone in the light of that magical grove,\n" + + "I felt the stars of the spirits of Love\n" + + "Gather and gleam round my delicate youth,\n" + + "And I heard the song of the spirits of Truth;\n" + + "To quench my longing I bent me low\n" + + "By the streams of the spirits of Peace that flow\n" + + "In that magical wood in the land of sleep."; + + public String getPoet() { + return poet; + } + + public void setPoet(String poet) { + this.poet = poet; + } + + public String getPoem() { + return poem; + } + + public void setPoem(String poem) { + this.poem = poem; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/SpringNettyUseSharedWorkerThreadPoolTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/SpringNettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/SpringNettyUseSharedWorkerThreadPoolTest.java new file mode 100644 index 0000000..bcedb10 --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/SpringNettyUseSharedWorkerThreadPoolTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +/** + * @version + */ +public class SpringNettyUseSharedWorkerThreadPoolTest extends CamelSpringTestSupport { + + @Test + public void testSharedThreadPool() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(30); + + for (int i = 0; i < 10; i++) { + String reply = template.requestBody("netty4:tcp://localhost:5021?textline=true&sync=true", "Hello World", String.class); + assertEquals("Hello World", reply); + + reply = template.requestBody("netty4:tcp://localhost:5022?textline=true&sync=true", "Hello Camel", String.class); + assertEquals("Hello Camel", reply); + + reply = template.requestBody("netty4:tcp://localhost:5023?textline=true&sync=true", "Hello Claus", String.class); + assertEquals("Hello Claus", reply); + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/netty4/SpringNettyUseSharedWorkerThreadPoolTest.xml"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/UnsharableCodecsConflicts2Test.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/UnsharableCodecsConflicts2Test.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/UnsharableCodecsConflicts2Test.java new file mode 100644 index 0000000..d44281a --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/UnsharableCodecsConflicts2Test.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.Arrays; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; +import org.junit.Test; + +/** + * + */ +public class UnsharableCodecsConflicts2Test extends BaseNettyTest { + + static final byte[] LENGTH_HEADER = {0x00, 0x00, 0x40, 0x00}; // 16384 bytes + + private Processor processor = new P(); + private int port; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + // create a single decoder + ChannelHandlerFactory decoder = ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4); + registry.bind("length-decoder", decoder); + + return registry; + } + + @Test + public void unsharableCodecsConflictsTest() throws Exception { + byte[] data1 = new byte[8192]; + byte[] data2 = new byte[16383]; + Arrays.fill(data1, (byte) 0x38); + Arrays.fill(data2, (byte) 0x39); + byte[] body1 = (new String(LENGTH_HEADER) + new String(data1)).getBytes(); + byte[] body2 = (new String(LENGTH_HEADER) + new String(data2)).getBytes(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived(new String(data2) + "9"); + + Socket client1 = getSocket("localhost", port); + Socket client2 = getSocket("localhost", port); + + // use two clients to send to the same server at the same time + try { + sendBuffer(body2, client2); + sendBuffer(body1, client1); + sendBuffer(new String("9").getBytes(), client2); + } catch (Exception e) { + log.error("", e); + } finally { + client1.close(); + client2.close(); + } + + mock.assertIsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + port = getPort(); + + from("netty4:tcp://localhost:{{port}}?decoder=#length-decoder&sync=false") + .process(processor) + .to("mock:result"); + } + }; + } + + private static Socket getSocket(String host, int port) throws IOException { + Socket s = new Socket(host, port); + s.setSoTimeout(60000); + return s; + } + + public static void sendBuffer(byte[] buf, Socket server) throws Exception { + OutputStream netOut = server.getOutputStream(); + OutputStream dataOut = new BufferedOutputStream(netOut); + try { + dataOut.write(buf, 0, buf.length); + dataOut.flush(); + } catch (Exception e) { + server.close(); + throw e; + } + } + + class P implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getOut().setBody( + new String(((BigEndianHeapChannelBuffer) exchange.getIn() + .getBody()).array())); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/UnsharableCodecsConflictsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/UnsharableCodecsConflictsTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/UnsharableCodecsConflictsTest.java new file mode 100644 index 0000000..3d301ab --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/UnsharableCodecsConflictsTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.util.Arrays; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.util.IOHelper; +import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; +import org.junit.Test; + +/** + * + */ +public class UnsharableCodecsConflictsTest extends BaseNettyTest { + + static final byte[] LENGTH_HEADER = {0x00, 0x00, 0x40, 0x00}; // 4096 bytes + + private Processor processor = new P(); + + private int port1; + private int port2; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + // we can share the decoder between multiple netty consumers, because they have the same configuration + // and we use a ChannelHandlerFactory + ChannelHandlerFactory decoder = ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4); + registry.bind("length-decoder", decoder); + registry.bind("length-decoder2", decoder); + + return registry; + } + + @Test + public void canSupplyMultipleCodecsToEndpointPipeline() throws Exception { + byte[] sPort1 = new byte[8192]; + byte[] sPort2 = new byte[16383]; + Arrays.fill(sPort1, (byte) 0x38); + Arrays.fill(sPort2, (byte) 0x39); + byte[] bodyPort1 = (new String(LENGTH_HEADER) + new String(sPort1)).getBytes(); + byte[] bodyPort2 = (new String(LENGTH_HEADER) + new String(sPort2)).getBytes(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived(new String(sPort2) + "9"); + + Socket server1 = getSocket("localhost", port1); + Socket server2 = getSocket("localhost", port2); + + try { + sendSopBuffer(bodyPort2, server2); + sendSopBuffer(bodyPort1, server1); + sendSopBuffer(new String("9").getBytes(), server2); + } catch (Exception e) { + log.error("", e); + } finally { + server1.close(); + server2.close(); + } + + mock.assertIsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + port1 = getPort(); + port2 = getNextPort(); + + from("netty4:tcp://localhost:" + port1 + "?decoder=#length-decoder&sync=false") + .process(processor); + + from("netty4:tcp://localhost:" + port2 + "?decoder=#length-decoder2&sync=false") + .process(processor) + .to("mock:result"); + } + }; + } + + private static Socket getSocket(String host, int port) throws IOException { + Socket s = new Socket(host, port); + s.setSoTimeout(60000); + return s; + } + + public static void sendSopBuffer(byte[] buf, Socket server) throws Exception { + BufferedOutputStream dataOut = IOHelper.buffered(server.getOutputStream()); + try { + dataOut.write(buf, 0, buf.length); + dataOut.flush(); + } catch (Exception e) { + IOHelper.close(dataOut); + server.close(); + throw e; + } + } + + class P implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getOut().setBody(new String(((BigEndianHeapChannelBuffer) exchange.getIn().getBody()).array())); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/resources/keystore.jks ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/resources/keystore.jks b/components/camel-netty4/src/test/resources/keystore.jks new file mode 100644 index 0000000..78e8571 Binary files /dev/null and b/components/camel-netty4/src/test/resources/keystore.jks differ http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/resources/log4j.properties b/components/camel-netty4/src/test/resources/log4j.properties new file mode 100644 index 0000000..660ba42 --- /dev/null +++ b/components/camel-netty4/src/test/resources/log4j.properties @@ -0,0 +1,38 @@ +## ------------------------------------------------------------------------ +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## ------------------------------------------------------------------------ + +# +# The logging properties used for eclipse testing, We want to see debug output on the console. +# +log4j.rootLogger=INFO, file + +# uncomment the following to enable camel debugging +#log4j.logger.org.apache.camel.component.netty4=TRACE +#log4j.logger.org.apache.camel=DEBUG +#log4j.logger.io.netty=TRACE + +# CONSOLE appender not used by default +log4j.appender.out=org.apache.log4j.ConsoleAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +#log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n +log4j.appender.out.layout.ConversionPattern=%d [%-35.35t] %-5p %-30.30c{1} - %m%n + +# File appender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.file.file=target/camel-netty4-test.log http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/resources/org/apache/camel/component/netty4/SpringNettyUseSharedWorkerThreadPoolTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/resources/org/apache/camel/component/netty4/SpringNettyUseSharedWorkerThreadPoolTest.xml b/components/camel-netty4/src/test/resources/org/apache/camel/component/netty4/SpringNettyUseSharedWorkerThreadPoolTest.xml new file mode 100644 index 0000000..359142f --- /dev/null +++ b/components/camel-netty4/src/test/resources/org/apache/camel/component/netty4/SpringNettyUseSharedWorkerThreadPoolTest.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <!-- use the worker pool builder to create to help create the shared thread pool --> + <bean id="poolBuilder" class="org.apache.camel.component.netty4.NettyWorkerPoolBuilder"> + <property name="workerCount" value="2"/> + </bean> + + <!-- the shared worker thread pool --> + <bean id="sharedPool" class="org.jboss.netty.channel.socket.nio.WorkerPool" + factory-bean="poolBuilder" factory-method="build" destroy-method="shutdown"> + </bean> + + <camelContext xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="netty4:tcp://localhost:5021?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false"/> + <to uri="log:result"/> + <to uri="mock:result"/> + </route> + + <route> + <from uri="netty4:tcp://localhost:5022?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false"/> + <to uri="log:result"/> + <to uri="mock:result"/> + </route> + + <route> + <from uri="netty4:tcp://localhost:5023?textline=true&sync=true&workerPool=#sharedPool&orderedThreadPoolExecutor=false"/> + <to uri="log:result"/> + <to uri="mock:result"/> + </route> + </camelContext> + +</beans> http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/resources/org/apache/camel/component/netty4/multiple-codecs.xml ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/resources/org/apache/camel/component/netty4/multiple-codecs.xml b/components/camel-netty4/src/test/resources/org/apache/camel/component/netty4/multiple-codecs.xml new file mode 100644 index 0000000..cc31bce --- /dev/null +++ b/components/camel-netty4/src/test/resources/org/apache/camel/component/netty4/multiple-codecs.xml @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <!-- START SNIPPET: routes --> + <camelContext id="multiple-netty-codecs-context" xmlns="http://camel.apache.org/schema/spring"> + <route> + <from uri="direct:multiple-codec"/> + <to uri="netty4:tcp://localhost:5150?encoders=#encoders&sync=false"/> + </route> + <route> + <from uri="netty4:tcp://localhost:5150?decoders=#length-decoder,#string-decoder&sync=false"/> + <to uri="mock:multiple-codec"/> + </route> + </camelContext> + <!-- END SNIPPET: routes --> + + <!-- START SNIPPET: registry-beans --> + <util:list id="decoders" list-class="java.util.LinkedList"> + <bean class="org.apache.camel.component.netty4.ChannelHandlerFactories" factory-method="newLengthFieldBasedFrameDecoder"> + <constructor-arg value="1048576"/> + <constructor-arg value="0"/> + <constructor-arg value="4"/> + <constructor-arg value="0"/> + <constructor-arg value="4"/> + </bean> + <bean class="org.jboss.netty.handler.codec.string.StringDecoder"/> + </util:list> + + <util:list id="encoders" list-class="java.util.LinkedList"> + <bean class="org.jboss.netty.handler.codec.frame.LengthFieldPrepender"> + <constructor-arg value="4"/> + </bean> + <bean class="org.jboss.netty.handler.codec.string.StringEncoder"/> + </util:list> + + <bean id="length-encoder" class="org.jboss.netty.handler.codec.frame.LengthFieldPrepender"> + <constructor-arg value="4"/> + </bean> + <bean id="string-encoder" class="org.jboss.netty.handler.codec.string.StringEncoder"/> + + <bean id="length-decoder" class="org.apache.camel.component.netty4.ChannelHandlerFactories" factory-method="newLengthFieldBasedFrameDecoder"> + <constructor-arg value="1048576"/> + <constructor-arg value="0"/> + <constructor-arg value="4"/> + <constructor-arg value="0"/> + <constructor-arg value="4"/> + </bean> + <bean id="string-decoder" class="org.jboss.netty.handler.codec.string.StringDecoder"/> + <!-- START SNIPPET: registry-beans --> + +</beans> http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/test/resources/test.txt ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/resources/test.txt b/components/camel-netty4/src/test/resources/test.txt new file mode 100644 index 0000000..b8713e9 --- /dev/null +++ b/components/camel-netty4/src/test/resources/test.txt @@ -0,0 +1,19 @@ +Song Of A Dream +by: Dr Sarojini Naidu + +ONCE in the dream of a night I stood +Lone in the light of a magical wood, +Soul-deep in visions that poppy-like sprang; +And spirits of Truth were the birds that sang, +And spirits of Love were the stars that glowed, +And spirits of Peace were the streams that flowed +In that magical wood in the land of sleep. + + +Lone in the light of that magical grove, +I felt the stars of the spirits of Love +Gather and gleam round my delicate youth, +And I heard the song of the spirits of Truth; +To quench my longing I bent me low +By the streams of the spirits of Peace that flow +In that magical wood in the land of sleep. \ No newline at end of file