This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 71f5d667ed7c9e5dd316c81dd19d159766c50c0f Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Apr 9 17:53:08 2018 +0200 CAMEL-12427: camel-netty4 - Add SPI to plugin custom correlation state for request/reply in producer --- .../netty4/NettyCustomCorrelationManagerTest.java | 122 +++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java new file mode 100644 index 0000000..35b2d6b --- /dev/null +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCustomCorrelationManagerTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.camel.Exchange; +import org.apache.camel.Predicate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +public class NettyCustomCorrelationManagerTest extends BaseNettyTest { + + private final MyCorrelationManager myManager = new MyCorrelationManager(); + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("myManager", myManager); + return jndi; + } + + @Test + public void testCustomCorrelationManager() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + // the messages can be processed in any order + mock.expectedBodiesReceivedInAnyOrder("Bye A", "Bye B", "Bye C"); + // the custom manager should be used + mock.allMessages().header("manager").isEqualTo(myManager); + // check that the request and reply are correlated correctly + mock.allMessages().predicate(exchange -> { + String request = exchange.getMessage().getHeader("request", String.class); + String reply = exchange.getMessage().getBody(String.class); + return reply.endsWith(request); + }); + + template.sendBodyAndHeader("seda:start", "A", "request", "A"); + template.sendBodyAndHeader("seda:start", "B", "request", "B"); + template.sendBodyAndHeader("seda:start", "C", "request", "C"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:start") + .log("before ${body}") + .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&correlationManager=#myManager") + .log("after ${body}") + .to("mock:result"); + + from("netty4:tcp://localhost:{{port}}?textline=true&sync=true") + .transform(body().prepend("Bye ")); + } + }; + } + + private static final class MyCorrelationManager implements NettyCamelStateCorrelationManager { + + private volatile NettyCamelState stateA; + private volatile NettyCamelState stateB; + private volatile NettyCamelState stateC; + + @Override + public void putState(Channel channel, NettyCamelState state) { + String body = state.getExchange().getMessage().getBody(String.class); + if ("A".equals(body)) { + stateA = state; + } else if ("B".equals(body)) { + stateB = state; + } else if ("C".equals(body)) { + stateC = state; + } + } + + @Override + public void removeState(ChannelHandlerContext ctx, Channel channel) { + // noop + } + + @Override + public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Object msg) { + String body = msg.toString(); + if (body.endsWith("A")) { + stateA.getExchange().getMessage().setHeader("manager", this); + return stateA; + } else if (body.endsWith("B")) { + stateB.getExchange().getMessage().setHeader("manager", this); + return stateB; + } else if (body.endsWith("C")) { + stateC.getExchange().getMessage().setHeader("manager", this); + return stateC; + } + return null; + } + + @Override + public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Throwable cause) { + // noop + return null; + } + } +} -- To stop receiving notification emails like this one, please contact davscl...@apache.org.