http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java index 5078508..f7e3be1d 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java @@ -16,7 +16,9 @@ */ package org.apache.camel.component.atomix.ha; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -30,7 +32,7 @@ import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class AtomixClusterView extends AbstractCamelClusterView { +final class AtomixClusterView extends AbstractCamelClusterView { private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClusterView.class); private final Atomix atomix; @@ -45,12 +47,17 @@ public final class AtomixClusterView extends AbstractCamelClusterView { } @Override - public CamelClusterMember getMaster() { + public Optional<CamelClusterMember> getMaster() { if (group == null) { - throw new IllegalStateException("The view has not yet joined the cluster"); + return Optional.empty(); } - return asCamelClusterMember(group.election().term().leader()); + GroupMember leader = group.election().term().leader(); + if (leader == null) { + return Optional.empty(); + } + + return Optional.of(asCamelClusterMember(leader)); } @Override @@ -61,7 +68,7 @@ public final class AtomixClusterView extends AbstractCamelClusterView { @Override public List<CamelClusterMember> getMembers() { if (group == null) { - throw new IllegalStateException("The view has not yet joined the cluster"); + return Collections.emptyList(); } return this.group.members().stream() @@ -105,11 +112,13 @@ public final class AtomixClusterView extends AbstractCamelClusterView { class AtomixLocalMember implements CamelClusterMember { private LocalMember member; - AtomixLocalMember() { - } - @Override public String getId() { + String id = getClusterService().getId(); + if (ObjectHelper.isNotEmpty(id)) { + return id; + } + if (member == null) { throw new IllegalStateException("The view has not yet joined the cluster"); } @@ -153,10 +162,6 @@ public final class AtomixClusterView extends AbstractCamelClusterView { return this; } - - LocalMember get() { - return member; - } } class AtomixClusterMember implements CamelClusterMember { @@ -175,11 +180,7 @@ public final class AtomixClusterView extends AbstractCamelClusterView { @Override public boolean isMaster() { - return group.election().term().leader().equals(member); - } - - GroupMember get() { - return member; + return member.equals(group.election().term().leader()); } } }
http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/component/atomix-client ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/component/atomix-client b/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/component/atomix-client deleted file mode 100644 index 5e7a620..0000000 --- a/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/component/atomix-client +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -class=org.apache.camel.component.atomix.AtomixClientComponent http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/component/atomix-map ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/component/atomix-map b/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/component/atomix-map new file mode 100644 index 0000000..268c354 --- /dev/null +++ b/components/camel-atomix/src/main/resources/META-INF/services/org/apache/camel/component/atomix-map @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.atomix.client.map.AtomixClientMapComponent http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/AtomixClientTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/AtomixClientTestSupport.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/AtomixClientTestSupport.java new file mode 100644 index 0000000..5251627 --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/AtomixClientTestSupport.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client; + +import io.atomix.AtomixClient; +import io.atomix.AtomixReplica; +import io.atomix.catalyst.transport.Address; +import org.apache.camel.component.atomix.AtomixHelper; +import org.apache.camel.test.AvailablePortFinder; +import org.apache.camel.test.junit4.CamelTestSupport; + +public abstract class AtomixClientTestSupport extends CamelTestSupport { + private Address replicaAddress; + private AtomixReplica replica; + private AtomixClient client; + + @Override + protected void doPreSetup() throws Exception { + replicaAddress = new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()); + + replica = AtomixReplica.builder(replicaAddress).withStorage(AtomixHelper.inMemoryStorage()).build(); + replica.bootstrap().join(); + + client = AtomixClient.builder().build(); + client.connect(replicaAddress).join(); + + super.doPreSetup(); + } + + @Override + public void tearDown() throws Exception { + if (client != null) { + client.close().join(); + } + if (replica != null) { + replica.shutdown().join(); + replica.leave().join(); + } + + super.tearDown(); + } + + // ************************************* + // properties + // ************************************* + + protected Address getReplicaAddress() { + return replicaAddress; + } + + protected AtomixReplica getReplica() { + return replica; + } + + protected AtomixClient getClient() { + return client; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumerTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumerTest.java new file mode 100644 index 0000000..8a7728a --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumerTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import java.util.Collections; +import java.util.UUID; + +import io.atomix.collections.DistributedMap; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.atomix.client.AtomixClientConstants; +import org.apache.camel.component.atomix.client.AtomixClientTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore +public class AtomixClientMapConsumerTest extends AtomixClientTestSupport { + private static final String MAP_NAME = UUID.randomUUID().toString(); + private DistributedMap<Object, Object> map; + + // ************************************ + // Setup + // ************************************ + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + AtomixClientMapComponent component = new AtomixClientMapComponent(); + component.setNodes(Collections.singletonList(getReplicaAddress())); + + registry.bind("atomix-map", component); + + return registry; + } + + @Override + protected void doPreSetup() throws Exception { + super.doPreSetup(); + + map = getClient().getMap(MAP_NAME).join(); + } + + @Override + public void tearDown() throws Exception { + map.close(); + + super.tearDown(); + } + + // ************************************ + // Test + // ************************************ + + @Test + public void test() throws Exception { + String key = context().getUuidGenerator().generateUuid(); + String put = context().getUuidGenerator().generateUuid(); + String upd = context().getUuidGenerator().generateUuid(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.assertExchangeReceived(3); + mock.expectedBodiesReceived(put, upd, upd); + mock.expectedHeaderReceived(AtomixClientConstants.RESOURCE_KEY, key); + + map.put(key, put).join(); + map.replace(key, upd).join(); + map.remove(key).join(); + + mock.assertIsSatisfied(); + + assertEquals( + DistributedMap.Events.ADD, + mock.getExchanges().get(0).getIn().getHeader(AtomixClientConstants.EVENT_TYPE) + ); + assertEquals( + DistributedMap.Events.UPDATE, + mock.getExchanges().get(1).getIn().getHeader(AtomixClientConstants.EVENT_TYPE) + ); + assertEquals( + DistributedMap.Events.REMOVE, + mock.getExchanges().get(2).getIn().getHeader(AtomixClientConstants.EVENT_TYPE) + ); + } + + // ************************************ + // Routes + // ************************************ + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + fromF("atomix-map:%s", MAP_NAME) + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducerTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducerTest.java new file mode 100644 index 0000000..f46d628 --- /dev/null +++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducerTest.java @@ -0,0 +1,398 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import io.atomix.collections.DistributedMap; +import org.apache.camel.EndpointInject; +import org.apache.camel.FluentProducerTemplate; +import org.apache.camel.Message; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.atomix.client.AtomixClientAction; +import org.apache.camel.component.atomix.client.AtomixClientConstants; +import org.apache.camel.component.atomix.client.AtomixClientTestSupport; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; + +public class AtomixClientMapProducerTest extends AtomixClientTestSupport { + private static final String MAP_NAME = UUID.randomUUID().toString(); + private DistributedMap<Object, Object> map; + + @EndpointInject(uri = "direct:start") + private FluentProducerTemplate fluent; + + // ************************************ + // Setup + // ************************************ + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + AtomixClientMapComponent component = new AtomixClientMapComponent(); + component.setNodes(Collections.singletonList(getReplicaAddress())); + + registry.bind("atomix-map", component); + + return registry; + } + + @Override + protected void doPreSetup() throws Exception { + super.doPreSetup(); + + map = getClient().getMap(MAP_NAME).join(); + } + + @Override + public void tearDown() throws Exception { + map.close(); + + super.tearDown(); + } + + // ************************************ + // Test + // ************************************ + + @Test + public void testPut() throws Exception { + final String key = context().getUuidGenerator().generateUuid(); + final String val = context().getUuidGenerator().generateUuid(); + + Message result; + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.PUT) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withHeader(AtomixClientConstants.RESOURCE_TTL, Duration.ofMillis(250)) + .withBody(val) + .request(Message.class); + + assertFalse(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(val, result.getBody()); + assertEquals(val, map.get(key).join()); + + CountDownLatch latch = new CountDownLatch(1); + map.onRemove(e -> latch.countDown()); + + latch.await(); + + assertFalse(val, map.containsKey(key).join()); + } + + @Test + public void testPutIfAbsent() throws Exception { + final String key = context().getUuidGenerator().generateUuid(); + final String val1 = context().getUuidGenerator().generateUuid(); + final String val2 = context().getUuidGenerator().generateUuid(); + + Message result; + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.PUT_IF_ABSENT) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withBody(val1) + .request(Message.class); + + assertFalse(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(val1, result.getBody()); + assertEquals(val1, map.get(key).join()); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.PUT_IF_ABSENT) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withBody(val2) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(val1, result.getBody()); + assertEquals(val1, map.get(key).join()); + } + + @Test + public void testGet() throws Exception { + final String key = context().getUuidGenerator().generateUuid(); + final String val = context().getUuidGenerator().generateUuid(); + + Message result; + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.GET) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .request(Message.class); + + assertFalse(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertFalse(map.containsKey(key).join()); + + map.put(key, val).join(); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.GET) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(val, result.getBody(String.class)); + assertTrue(map.containsKey(key).join()); + } + + @Test + public void testSizeClearIsEmpty() throws Exception { + final String key = context().getUuidGenerator().generateUuid(); + final String val = context().getUuidGenerator().generateUuid(); + + Message result; + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.SIZE) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(0, result.getBody(Integer.class).intValue()); + assertEquals(map.size().join(), result.getBody(Integer.class)); + + map.put(key, val).join(); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.SIZE) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(1, result.getBody(Integer.class).intValue()); + assertEquals(map.size().join(), result.getBody(Integer.class)); + + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.IS_EMPTY) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertFalse(result.getBody(Boolean.class)); + assertFalse(map.isEmpty().join()); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.CLEAR) + .request(Message.class); + + assertFalse(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(0, map.size().join().intValue()); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.IS_EMPTY) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertTrue(result.getBody(Boolean.class)); + assertTrue(map.isEmpty().join()); + } + + @Test + public void testContainsKey() throws Exception { + final String key = context().getUuidGenerator().generateUuid(); + final String val = context().getUuidGenerator().generateUuid(); + + Message result; + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.CONTAINS_KEY) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withBody(val) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertFalse(result.getBody(Boolean.class)); + assertFalse(map.containsKey(key).join()); + + map.put(key, val).join(); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.CONTAINS_KEY) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withBody(val) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertTrue(result.getBody(Boolean.class)); + assertTrue(map.containsKey(key).join()); + } + + @Test + public void testContainsValue() throws Exception { + final String key = context().getUuidGenerator().generateUuid(); + final String val = context().getUuidGenerator().generateUuid(); + + Message result; + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.CONTAINS_VALUE) + .withHeader(AtomixClientConstants.RESOURCE_VALUE, val) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertFalse(result.getBody(Boolean.class)); + assertFalse(map.containsKey(key).join()); + + map.put(key, val).join(); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.CONTAINS_VALUE) + .withHeader(AtomixClientConstants.RESOURCE_VALUE, val) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertTrue(result.getBody(Boolean.class)); + assertTrue(map.containsValue(val).join()); + } + + @Test + public void testRemove() throws Exception { + final String key = context().getUuidGenerator().generateUuid(); + final String val = context().getUuidGenerator().generateUuid(); + + map.put(key, val).join(); + + Message result; + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.REMOVE) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withHeader(AtomixClientConstants.RESOURCE_VALUE, context().getUuidGenerator().generateUuid()) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertFalse(result.getBody(Boolean.class)); + assertTrue(map.containsKey(key).join()); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.REMOVE) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withHeader(AtomixClientConstants.RESOURCE_VALUE, val) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertTrue(result.getBody(Boolean.class)); + assertFalse(map.containsKey(key).join()); + + map.put(key, val).join(); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.REMOVE) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(val, result.getBody(String.class)); + assertFalse(map.containsKey(key).join()); + } + + @Test + public void testReplace() throws Exception { + final String key = context().getUuidGenerator().generateUuid(); + final String oldVal = context().getUuidGenerator().generateUuid(); + final String newVal = context().getUuidGenerator().generateUuid(); + + map.put(key, oldVal).join(); + + Message result; + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.REPLACE) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withHeader(AtomixClientConstants.RESOURCE_OLD_VALUE, context().getUuidGenerator().generateUuid()) + .withBody(newVal) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertFalse(result.getBody(Boolean.class)); + assertEquals(oldVal, map.get(key).join()); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.REPLACE) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withHeader(AtomixClientConstants.RESOURCE_OLD_VALUE, oldVal) + .withBody(newVal) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertTrue(result.getBody(Boolean.class)); + assertEquals(newVal, map.get(key).join()); + + map.put(key, oldVal).join(); + + result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.REPLACE) + .withHeader(AtomixClientConstants.RESOURCE_KEY, key) + .withBody(newVal) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(oldVal, result.getBody(String.class)); + assertEquals(newVal, map.get(key).join()); + } + + @Test + public void testValues() throws Exception { + map.put(context().getUuidGenerator().generateUuid(), context().getUuidGenerator().generateUuid()).join(); + map.put(context().getUuidGenerator().generateUuid(), context().getUuidGenerator().generateUuid()).join(); + map.put(context().getUuidGenerator().generateUuid(), context().getUuidGenerator().generateUuid()).join(); + + Message result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.VALUES) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertThat(map.values().join(), is(result.getBody(Collection.class))); + } + + @Test + public void testEntrySet() throws Exception { + map.put(context().getUuidGenerator().generateUuid(), context().getUuidGenerator().generateUuid()).join(); + map.put(context().getUuidGenerator().generateUuid(), context().getUuidGenerator().generateUuid()).join(); + map.put(context().getUuidGenerator().generateUuid(), context().getUuidGenerator().generateUuid()).join(); + + Message result = fluent.clearAll() + .withHeader(AtomixClientConstants.RESOURCE_ACTION, AtomixClientAction.ENTRY_SET) + .request(Message.class); + + assertTrue(result.getHeader(AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT, Boolean.class)); + assertEquals(map.entrySet().join().size(), result.getBody(Set.class).size()); + } + + // ************************************ + // Routes + // ************************************ + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:start") + .toF("atomix-map:%s", MAP_NAME); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 29a8069..7ea2d9a 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -72,6 +72,7 @@ <atmosphere-version>2.4.10</atmosphere-version> <atmosphere-version-range>[2.4,3.0)</atmosphere-version-range> <atomix-version>1.0.8</atomix-version> + <atomix-catalyst-version>1.2.1</atomix-catalyst-version> <automaton-bundle-version>1.11-8_1</automaton-bundle-version> <avalon-bundle-version>4.3.1_1</avalon-bundle-version> <avalon-version>4.3.1</avalon-version> http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/client/map/springboot/AtomixClientMapComponentAutoConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/client/map/springboot/AtomixClientMapComponentAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/client/map/springboot/AtomixClientMapComponentAutoConfiguration.java new file mode 100644 index 0000000..f6e8b5a --- /dev/null +++ b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/client/map/springboot/AtomixClientMapComponentAutoConfiguration.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map.springboot; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Generated; +import org.apache.camel.CamelContext; +import org.apache.camel.component.atomix.client.map.AtomixClientMapComponent; +import org.apache.camel.spi.ComponentCustomizer; +import org.apache.camel.spi.HasId; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.apache.camel.spring.boot.ComponentConfigurationProperties; +import org.apache.camel.spring.boot.util.ConditionalOnCamelContextAndAutoConfigurationBeans; +import org.apache.camel.spring.boot.util.GroupCondition; +import org.apache.camel.spring.boot.util.HierarchicalPropertiesEvaluator; +import org.apache.camel.util.IntrospectionSupport; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; + +/** + * Generated by camel-package-maven-plugin - do not edit this file! + */ +@Generated("org.apache.camel.maven.packaging.SpringBootAutoConfigurationMojo") +@Configuration +@Conditional({ConditionalOnCamelContextAndAutoConfigurationBeans.class, + AtomixClientMapComponentAutoConfiguration.GroupConditions.class}) +@AutoConfigureAfter(CamelAutoConfiguration.class) +@EnableConfigurationProperties({ComponentConfigurationProperties.class, + AtomixClientMapComponentConfiguration.class}) +public class AtomixClientMapComponentAutoConfiguration { + + private static final Logger LOGGER = LoggerFactory + .getLogger(AtomixClientMapComponentAutoConfiguration.class); + @Autowired + private ApplicationContext applicationContext; + @Autowired + private CamelContext camelContext; + @Autowired + private AtomixClientMapComponentConfiguration configuration; + @Autowired(required = false) + private List<ComponentCustomizer<AtomixClientMapComponent>> customizers; + + static class GroupConditions extends GroupCondition { + public GroupConditions() { + super("camel.component", "camel.component.atomix-map"); + } + } + + @Lazy + @Bean(name = "atomix-map-component") + @ConditionalOnMissingBean(AtomixClientMapComponent.class) + public AtomixClientMapComponent configureAtomixClientMapComponent() + throws Exception { + AtomixClientMapComponent component = new AtomixClientMapComponent(); + component.setCamelContext(camelContext); + Map<String, Object> parameters = new HashMap<>(); + IntrospectionSupport.getProperties(configuration, parameters, null, + false); + for (Map.Entry<String, Object> entry : parameters.entrySet()) { + Object value = entry.getValue(); + Class<?> paramClass = value.getClass(); + if (paramClass.getName().endsWith("NestedConfiguration")) { + Class nestedClass = null; + try { + nestedClass = (Class) paramClass.getDeclaredField( + "CAMEL_NESTED_CLASS").get(null); + HashMap<String, Object> nestedParameters = new HashMap<>(); + IntrospectionSupport.getProperties(value, nestedParameters, + null, false); + Object nestedProperty = nestedClass.newInstance(); + IntrospectionSupport.setProperties(camelContext, + camelContext.getTypeConverter(), nestedProperty, + nestedParameters); + entry.setValue(nestedProperty); + } catch (NoSuchFieldException e) { + } + } + } + IntrospectionSupport.setProperties(camelContext, + camelContext.getTypeConverter(), component, parameters); + if (ObjectHelper.isNotEmpty(customizers)) { + for (ComponentCustomizer<AtomixClientMapComponent> customizer : customizers) { + boolean useCustomizer = (customizer instanceof HasId) + ? HierarchicalPropertiesEvaluator.evaluate( + applicationContext.getEnvironment(), + "camel.component.customizer", + "camel.component.atomix-map.customizer", + ((HasId) customizer).getId()) + : HierarchicalPropertiesEvaluator.evaluate( + applicationContext.getEnvironment(), + "camel.component.customizer", + "camel.component.atomix-map.customizer"); + if (useCustomizer) { + LOGGER.debug("Configure component {}, with customizer {}", + component, customizer); + customizer.customize(component); + } + } + } + return component; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/client/map/springboot/AtomixClientMapComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/client/map/springboot/AtomixClientMapComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/client/map/springboot/AtomixClientMapComponentConfiguration.java new file mode 100644 index 0000000..867d74f --- /dev/null +++ b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/java/org/apache/camel/component/atomix/client/map/springboot/AtomixClientMapComponentConfiguration.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map.springboot; + +import java.util.List; +import javax.annotation.Generated; +import io.atomix.AtomixClient; +import io.atomix.catalyst.transport.Address; +import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.NestedConfigurationProperty; + +/** + * Camel Atomix support + * + * Generated by camel-package-maven-plugin - do not edit this file! + */ +@Generated("org.apache.camel.maven.packaging.SpringBootAutoConfigurationMojo") +@ConfigurationProperties(prefix = "camel.component.atomix-map") +public class AtomixClientMapComponentConfiguration + extends + ComponentConfigurationPropertiesCommon { + + /** + * The shared component configuration + */ + @NestedConfigurationProperty + private C configuration; + /** + * The shared AtomixClient instance + */ + @NestedConfigurationProperty + private AtomixClient atomix; + /** + * The nodes the AtomixClient should connect to + */ + private List<Address> nodes; + /** + * The path to the AtomixClient configuration + */ + private String configurationUri; + /** + * Whether the component should resolve property placeholders on itself when + * starting. Only properties which are of String type can use property + * placeholders. + */ + private Boolean resolvePropertyPlaceholders = true; + + public C getConfiguration() { + return configuration; + } + + public void setConfiguration(C configuration) { + this.configuration = configuration; + } + + public AtomixClient getAtomix() { + return atomix; + } + + public void setAtomix(AtomixClient atomix) { + this.atomix = atomix; + } + + public List<Address> getNodes() { + return nodes; + } + + public void setNodes(List<Address> nodes) { + this.nodes = nodes; + } + + public String getConfigurationUri() { + return configurationUri; + } + + public void setConfigurationUri(String configurationUri) { + this.configurationUri = configurationUri; + } + + public Boolean getResolvePropertyPlaceholders() { + return resolvePropertyPlaceholders; + } + + public void setResolvePropertyPlaceholders( + Boolean resolvePropertyPlaceholders) { + this.resolvePropertyPlaceholders = resolvePropertyPlaceholders; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/resources/META-INF/spring.factories ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/resources/META-INF/spring.factories b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..a0f99a3 --- /dev/null +++ b/platforms/spring-boot/components-starter/camel-atomix-starter/src/main/resources/META-INF/spring.factories @@ -0,0 +1,19 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +org.apache.camel.component.atomix.client.map.springboot.AtomixClientMapComponentAutoConfiguration