Repository: camel Updated Branches: refs/heads/master 9ecc0cc48 -> 4a5666d46
topic consumers support added. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/69138a10 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/69138a10 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/69138a10 Branch: refs/heads/master Commit: 69138a105661775ca9f2362fb9d479832baa5a09 Parents: 9e5968c Author: juanjovazquez <jvazq...@tecsisa.com> Authored: Thu Aug 28 14:06:33 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Dec 3 19:57:21 2014 +0100 ---------------------------------------------------------------------- .../component/hazelcast/HazelcastConstants.java | 3 + .../listener/CamelMessageListener.java | 37 ++++++++ .../hazelcast/topic/HazelcastTopicConsumer.java | 40 +++++++++ .../hazelcast/topic/HazelcastTopicEndpoint.java | 5 +- .../hazelcast/HazelcastTopicConsumerTest.java | 91 ++++++++++++++++++++ .../hazelcast/HazelcastTopicProducerTest.java | 2 +- 6 files changed, 175 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/69138a10/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java index ebd20f6..e1a6304 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastConstants.java @@ -84,6 +84,9 @@ public final class HazelcastConstants { public static final String UPDATED = "updated"; public static final String ADDED = "added"; + // message listener actions (topic) + public static final String RECEIVED = "received"; + // storage types (map, queue, topic, multimap) public static final String MAP = "map"; public static final String MULTIMAP = "multimap"; http://git-wip-us.apache.org/repos/asf/camel/blob/69138a10/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelMessageListener.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelMessageListener.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelMessageListener.java new file mode 100644 index 0000000..1a6f60a --- /dev/null +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelMessageListener.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hazelcast.listener; + +import com.hazelcast.core.Message; +import com.hazelcast.core.MessageListener; +import org.apache.camel.component.hazelcast.HazelcastConstants; +import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer; + +/** + * + */ +public class CamelMessageListener extends CamelListener implements MessageListener<Object> { + + public CamelMessageListener(HazelcastDefaultConsumer consumer, String cacheName) { + super(consumer, cacheName); + } + + public void onMessage(Message<Object> objectMessage) { + this.sendExchange(HazelcastConstants.RECEIVED, null, objectMessage); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/69138a10/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java new file mode 100644 index 0000000..958f0bd --- /dev/null +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hazelcast.topic; + +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IQueue; +import com.hazelcast.core.ITopic; +import org.apache.camel.Endpoint; +import org.apache.camel.Processor; +import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer; +import org.apache.camel.component.hazelcast.listener.CamelItemListener; +import org.apache.camel.component.hazelcast.listener.CamelMessageListener; + +/** + * + */ +public class HazelcastTopicConsumer extends HazelcastDefaultConsumer { + + public HazelcastTopicConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) { + super(hazelcastInstance, endpoint, processor, cacheName); + + ITopic<Object> topic = hazelcastInstance.getTopic(cacheName); + topic.addMessageListener(new CamelMessageListener(this, cacheName)); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/69138a10/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java index 34a801e..920be2e 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicEndpoint.java @@ -34,8 +34,9 @@ public class HazelcastTopicEndpoint extends HazelcastDefaultEndpoint { @Override public Consumer createConsumer(Processor processor) throws Exception { - // TODO - return null; + HazelcastTopicConsumer answer = new HazelcastTopicConsumer(hazelcastInstance, this, processor, cacheName); + configureConsumer(answer); + return answer; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/69138a10/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastTopicConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastTopicConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastTopicConsumerTest.java new file mode 100644 index 0000000..57531f6 --- /dev/null +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastTopicConsumerTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hazelcast; + +import com.hazelcast.core.*; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HazelcastTopicConsumerTest extends HazelcastCamelTestSupport { + + @Mock + private ITopic<String> topic; + + private ArgumentCaptor<MessageListener> argument; + + @Override + @SuppressWarnings("unchecked") + protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) { + when(hazelcastInstance.<String>getTopic("foo")).thenReturn(topic); + argument = ArgumentCaptor.forClass(MessageListener.class); + when(topic.addMessageListener(argument.capture())).thenReturn("foo"); + } + + @Override + @SuppressWarnings("unchecked") + protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) { + verify(hazelcastInstance).getTopic("foo"); + verify(topic).addMessageListener(any(MessageListener.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void receive() throws InterruptedException { + MockEndpoint out = getMockEndpoint("mock:received"); + out.expectedMessageCount(1); + + final Message<String> msg = new Message<String>("foo", "foo", new java.util.Date().getTime(), null); + argument.getValue().onMessage(msg); + + assertMockEndpointsSatisfied(2000, TimeUnit.MILLISECONDS); + + this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.RECEIVED); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(String.format("hazelcast:%sfoo", HazelcastConstants.TOPIC_PREFIX)).log("object...") + .choice() + .when(header(HazelcastConstants.LISTENER_ACTION).isEqualTo(HazelcastConstants.RECEIVED)) + .log("...received").to("mock:received") + .otherwise() + .log("fail!"); + } + }; + } + + private void checkHeaders(Map<String, Object> headers, String action) { + assertEquals(action, headers.get(HazelcastConstants.LISTENER_ACTION)); + assertEquals(HazelcastConstants.CACHE_LISTENER, headers.get(HazelcastConstants.LISTENER_TYPE)); + assertEquals(null, headers.get(HazelcastConstants.OBJECT_ID)); + assertNotNull(headers.get(HazelcastConstants.LISTENER_TIME)); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/69138a10/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastTopicProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastTopicProducerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastTopicProducerTest.java index 240f5c6..37f1750 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastTopicProducerTest.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastTopicProducerTest.java @@ -58,7 +58,7 @@ public class HazelcastTopicProducerTest extends HazelcastCamelTestSupport { } @Test - public void add() { + public void publish() { template.sendBody("direct:publish", "bar"); verify(topic).publish("bar"); }