This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 484cd1045dd60858c4a3713191e56155dac57715 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Apr 12 10:11:07 2018 +0200 CAMEL-12427: add example for using custom correlation manager and codec with netty --- .../camel-example-netty-custom-correlation/pom.xml | 109 +++++++++++++++++++++ .../readme.adoc | 30 ++++++ .../org/apache/camel/example/netty/MyClient.java | 74 ++++++++++++++ .../apache/camel/example/netty/MyCodecDecoder.java | 72 ++++++++++++++ .../camel/example/netty/MyCodecDecoderFactory.java | 28 ++++++ .../apache/camel/example/netty/MyCodecEncoder.java | 50 ++++++++++ .../camel/example/netty/MyCodecEncoderFactory.java | 28 ++++++ .../camel/example/netty/MyCorrelationManager.java | 72 ++++++++++++++ .../org/apache/camel/example/netty/MyServer.java | 53 ++++++++++ .../src/main/resources/log4j2.properties | 23 +++++ examples/pom.xml | 1 + 11 files changed, 540 insertions(+) diff --git a/examples/camel-example-netty-custom-correlation/pom.xml b/examples/camel-example-netty-custom-correlation/pom.xml new file mode 100644 index 0000000..7a18b35 --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/pom.xml @@ -0,0 +1,109 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel.example</groupId> + <artifactId>examples</artifactId> + <version>2.22.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-example-netty-custom-correlation</artifactId> + <packaging>jar</packaging> + <name>Camel :: Example :: Netty Custom Correlation</name> + <description>An example for showing Camel Netty with custom codec and correlation id</description> + + <properties> + <category>Messaging</category> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-netty4</artifactId> + </dependency> + + <!-- logging --> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-jul</artifactId> + </dependency> + + </dependencies> + + <profiles> + + <profile> + <id>client</id> + <build> + <plugins> + <!-- Allows the example to be run via 'mvn compile exec:java -Pclient' --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <configuration> + <mainClass>org.apache.camel.example.netty.MyClient</mainClass> + <includePluginDependencies>false</includePluginDependencies> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + <profile> + <id>server</id> + <build> + <plugins> + <!-- Allows the example to be run via 'mvn compile exec:java -Pserver' --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <configuration> + <mainClass>org.apache.camel.example.netty.MyServer</mainClass> + <includePluginDependencies>false</includePluginDependencies> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + </profiles> + +</project> diff --git a/examples/camel-example-netty-custom-correlation/readme.adoc b/examples/camel-example-netty-custom-correlation/readme.adoc new file mode 100644 index 0000000..17d215c --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/readme.adoc @@ -0,0 +1,30 @@ +== Camel Example Netty Custom Correlation + +This example shows how to use TCP communication with Netty using a custom codec +to encode and decode the data over the wire. + +The example also uses a single shared connection between the client and the server +to multiplex concurrent messages over the same connection. A custom correlation manager +is implemented to be able to correlate the request and response message pairs so you +do not mix-data to wrong replies. + +=== How to run + +You can run this example using two JVMs. + +To start the server run: + + mvn compile exec:java -P server + +To start the client run: + + mvn compile exec:java -P client + +In the client output you should see it logs request/response pairs. +For requests that contains the word `beer` is delayed on the server side, and you +should notice that its corresponding reply is correlated correclty to its beloing request thread. +Also the messages can be inter-leaved when some messages are faster than others. + +=== More information + +You can find more information about Apache Camel at the website: http://camel.apache.org/ diff --git a/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyClient.java b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyClient.java new file mode 100644 index 0000000..6c69fb9 --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyClient.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.netty; + +import java.util.Random; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.main.Main; + +/** + * Netty client which calls the server every half-second with a random word. + */ +public final class MyClient { + + private MyClient() { + } + + public static void main(String[] args) throws Exception { + Main main = new Main(); + main.addRouteBuilder(new MyRouteBuilder()); + main.bind("myEncoder", new MyCodecEncoderFactory()); + main.bind("myDecoder", new MyCodecDecoderFactory()); + main.bind("myManager", new MyCorrelationManager()); + main.run(args); + } + + public static class MyRouteBuilder extends RouteBuilder { + + private String[] words = new String[]{"foo", "bar", "baz", "beer", "wine", "cheese"}; + private int counter; + + public int increment() { + return ++counter; + } + + public String word() { + int ran = new Random().nextInt(6); + return words[ran]; + } + + @Override + public void configure() throws Exception { + from("timer:trigger") + // set correlation id as unique incrementing number + .setHeader("corId", method(this, "increment")) + // set random word to use in request + .setHeader("word", method(this, "word")) + // build request message as a string body + .setBody(simple("#${header.corId}:${header.word}")) + // log request before + .log("Request: ${id}:${body}") + // call netty server using a single shared connection and using custom correlation manager + // to ensure we can correltly map the request and response pairs + .to("netty4:tcp://localhost:4444?sync=true&encoders=#myEncoder&decoders=#myDecoder" + + "&producerPoolEnabled=false&correlationManager=#myManager") + // log response after + .log("Response: ${id}:${body}"); + } + } +} diff --git a/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecDecoder.java b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecDecoder.java new file mode 100644 index 0000000..d2222a8 --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecDecoder.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.netty; + +import java.nio.charset.Charset; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; + +/** + * Netty decoder that assembles a complete messages from the frames received by Netty. + * The decoder uses delimter based on start and end byte markers, to know when all + * data for a complete message has been received. + */ +public class MyCodecDecoder extends DelimiterBasedFrameDecoder { + + private static final int MAX_FRAME_LENGTH = 4096; + + private static char startByte = 0x0b; // 11 decimal + private static char endByte1 = 0x1c; // 28 decimal + private static char endByte2 = 0x0d; // 13 decimal + + public MyCodecDecoder() { + super(MAX_FRAME_LENGTH, true, Unpooled.copiedBuffer( + new char[]{endByte1, endByte2}, Charset.defaultCharset())); + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + ByteBuf buf = (ByteBuf) super.decode(ctx, buffer); + if (buf != null) { + try { + int pos = buf.bytesBefore((byte) startByte); + if (pos >= 0) { + ByteBuf msg = buf.readerIndex(pos + 1).slice(); + return asString(msg); + } else { + throw new DecoderException("Did not find start byte " + (int) startByte); + } + } finally { + // We need to release the buf here to avoid the memory leak + buf.release(); + } + } + // Message not complete yet - return null to be called again + return null; + } + + private String asString(ByteBuf msg) { + // convert the message to a String which Camel will then use + String text = msg.toString(Charset.defaultCharset()); + return text; + } + +} diff --git a/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecDecoderFactory.java b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecDecoderFactory.java new file mode 100644 index 0000000..362ba36 --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecDecoderFactory.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.netty; + +import io.netty.channel.ChannelHandler; +import org.apache.camel.component.netty4.DefaultChannelHandlerFactory; + +public class MyCodecDecoderFactory extends DefaultChannelHandlerFactory { + + @Override + public ChannelHandler newChannelHandler() { + return new MyCodecDecoder(); + } +} diff --git a/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecEncoder.java b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecEncoder.java new file mode 100644 index 0000000..5568e74 --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecEncoder.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +/** + * Netty encoder that writes the Camel message to bytes with start and end byte markers. + */ +public class MyCodecEncoder extends MessageToByteEncoder<Object> { + + private static char startByte = 0x0b; // 11 decimal + private static char endByte1 = 0x1c; // 28 decimal + private static char endByte2 = 0x0d; // 13 decimal + + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, Object message, ByteBuf byteBuf) throws Exception { + + byte[] body; + if (message instanceof String) { + body = ((String) message).getBytes(); + } else if (message instanceof byte[]) { + body = (byte[]) message; + } else { + throw new IllegalArgumentException("The message to encode is not a supported type: " + + message.getClass().getCanonicalName()); + } + + byteBuf.writeByte(startByte); + byteBuf.writeBytes(body); + byteBuf.writeByte(endByte1); + byteBuf.writeByte(endByte2); + } +} diff --git a/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecEncoderFactory.java b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecEncoderFactory.java new file mode 100644 index 0000000..32392d9 --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCodecEncoderFactory.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.netty; + +import io.netty.channel.ChannelHandler; +import org.apache.camel.component.netty4.DefaultChannelHandlerFactory; + +public class MyCodecEncoderFactory extends DefaultChannelHandlerFactory { + + @Override + public ChannelHandler newChannelHandler() { + return new MyCodecEncoder(); + } +} diff --git a/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCorrelationManager.java b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCorrelationManager.java new file mode 100644 index 0000000..3c3cd4e --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyCorrelationManager.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.netty; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.camel.component.netty4.NettyCamelState; +import org.apache.camel.component.netty4.NettyCamelStateCorrelationManager; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.StringHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MyCorrelationManager implements NettyCamelStateCorrelationManager { + + private static final Logger LOG = LoggerFactory.getLogger(MyCorrelationManager.class); + + private final ConcurrentMap<String, NettyCamelState> map = new ConcurrentHashMap<>(); + + @Override + public void putState(Channel channel, NettyCamelState state) { + // grab the correlation id + String body = state.getExchange().getMessage().getBody(String.class); + // the correlation id is the first part of the message + String cid = StringHelper.before(body, ":"); + if (ObjectHelper.isEmpty(cid)) { + throw new IllegalArgumentException("CorrelationID is missing"); + } + LOG.debug("putState({}) on channel: {}", cid, channel.id()); + map.put(cid, state); + } + + @Override + public void removeState(ChannelHandlerContext channelHandlerContext, Channel channel) { + // noop + } + + @Override + public NettyCamelState getState(ChannelHandlerContext channelHandlerContext, Channel channel, Object body) { + // the correlation id is the first part of the message + String cid = StringHelper.before(body.toString(), ":"); + if (ObjectHelper.isEmpty(cid)) { + throw new IllegalArgumentException("CorrelationID is missing"); + } + LOG.debug("getState({}) on channel: {}", cid, channel.id()); + // lets remove after use as its no longer needed + return map.remove(cid); + } + + @Override + public NettyCamelState getState(ChannelHandlerContext channelHandlerContext, Channel channel, Throwable throwable) { + // noop + return null; + } +} diff --git a/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyServer.java b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyServer.java new file mode 100644 index 0000000..3e3bb7b --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/src/main/java/org/apache/camel/example/netty/MyServer.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.example.netty; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.main.Main; + +/** + * Netty server which returns back an echo of the incoming request. + */ +public final class MyServer { + + private MyServer() { + } + + public static void main(String[] args) throws Exception { + Main main = new Main(); + main.addRouteBuilder(new MyRouteBuilder()); + main.bind("myEncoder", new MyCodecEncoderFactory()); + main.bind("myDecoder", new MyCodecDecoderFactory()); + main.run(args); + } + + private static class MyRouteBuilder extends RouteBuilder { + + @Override + public void configure() throws Exception { + from("netty4:tcp://localhost:4444?sync=true&encoders=#myEncoder&decoders=#myDecoder") + .log("Request: ${id}:${body}") + .filter(simple("${body} contains 'beer'")) + // use some delay when its beer to make responses interleaved + // and make the delay asynchronous + .delay(simple("${random(2000,4000)}")).asyncDelayed().end() + .end() + .transform(simple("${body}-Echo")) + .log("Response: ${id}:${body}"); + } + } +} diff --git a/examples/camel-example-netty-custom-correlation/src/main/resources/log4j2.properties b/examples/camel-example-netty-custom-correlation/src/main/resources/log4j2.properties new file mode 100644 index 0000000..d406a9f --- /dev/null +++ b/examples/camel-example-netty-custom-correlation/src/main/resources/log4j2.properties @@ -0,0 +1,23 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +appender.out.type = Console +appender.out.name = out +appender.out.layout.type = PatternLayout +appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +rootLogger.level = INFO +rootLogger.appenderRef.out.ref = out diff --git a/examples/pom.xml b/examples/pom.xml index f7bfa22..4d1bd3f 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -76,6 +76,7 @@ <module>camel-example-loan-broker-jms</module> <module>camel-example-management</module> <module>camel-example-mybatis</module> + <module>camel-example-netty-custom-correlation</module> <module>camel-example-netty-http</module> <module>camel-example-olingo4-blueprint</module> <module>camel-example-opentracing</module> -- To stop receiving notification emails like this one, please contact davscl...@apache.org.