CAMEL-10054: Create camel-atomix component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/08d2b0fe Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/08d2b0fe Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/08d2b0fe Branch: refs/heads/master Commit: 08d2b0fe8a91eab8e22e064299eaa74783279f2f Parents: b3681ba Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed Jun 14 18:02:53 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Fri Jun 16 17:37:55 2017 +0200 ---------------------------------------------------------------------- .../apache/camel/FluentProducerTemplate.java | 5 + .../builder/DefaultFluentProducerTemplate.java | 8 + .../org/apache/camel/util/ExchangeHelper.java | 18 + .../camel/util/function/ThrowingBiFunction.java | 22 + components/camel-atomix/pom.xml | 21 +- .../src/main/docs/atomix-client-component.adoc | 6 - .../src/main/docs/atomix-map-component.adoc | 45 +++ .../atomix/AtomixAsyncMessageProcessor.java | 31 ++ .../component/atomix/AtomixConfiguration.java | 36 +- .../camel/component/atomix/AtomixHelper.java | 29 ++ .../AbstractAsyncAtomixClientProducer.java | 140 +++++++ .../client/AbstractAtomixClientComponent.java | 93 +++++ .../client/AbstractAtomixClientConsumer.java | 31 ++ .../client/AbstractAtomixClientEndpoint.java | 86 ++++ .../client/AbstractAtomixClientProducer.java | 30 ++ .../atomix/client/AtomixClientAction.java | 32 ++ .../atomix/client/AtomixClientComponent.java | 37 -- .../client/AtomixClientConfiguration.java | 7 +- .../atomix/client/AtomixClientConstants.java | 34 ++ .../atomix/client/AtomixClientHelper.java | 61 +++ .../client/map/AtomixClientMapComponent.java | 41 ++ .../map/AtomixClientMapConfiguration.java | 109 +++++ .../client/map/AtomixClientMapConsumer.java | 85 ++++ .../client/map/AtomixClientMapEndpoint.java | 53 +++ .../client/map/AtomixClientMapProducer.java | 364 +++++++++++++++++ .../cluster/AtomixClusterConfiguration.java | 60 +-- .../atomix/cluster/AtomixClusterHelper.java | 59 +-- .../atomix/ha/AtomixClusterService.java | 30 +- .../component/atomix/ha/AtomixClusterView.java | 35 +- .../org/apache/camel/component/atomix-client | 18 - .../org/apache/camel/component/atomix-map | 18 + .../atomix/client/AtomixClientTestSupport.java | 72 ++++ .../client/map/AtomixClientMapConsumerTest.java | 115 ++++++ .../client/map/AtomixClientMapProducerTest.java | 398 +++++++++++++++++++ parent/pom.xml | 1 + ...omixClientMapComponentAutoConfiguration.java | 129 ++++++ .../AtomixClientMapComponentConfiguration.java | 103 +++++ .../main/resources/META-INF/spring.factories | 19 + 38 files changed, 2319 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java index 688ce19..3a37890 100644 --- a/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java @@ -137,6 +137,11 @@ public interface FluentProducerTemplate extends Service { // ----------------------------------------------------------------------- /** + * Remove the body and headers. + */ + FluentProducerTemplate clearAll(); + + /** * Set the header * * @param key the key of the header http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java index 965afef..4c6034b 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java @@ -124,6 +124,14 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu } @Override + public FluentProducerTemplate clearAll() { + clearBody(); + clearHeaders(); + + return this; + } + + @Override public FluentProducerTemplate withHeader(String key, Object value) { if (headers == null) { headers = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java index 06fe7bf..11b0dac 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java @@ -129,6 +129,24 @@ public final class ExchangeHelper { } /** + * Gets the mandatory inbound header of the correct type + * + * @param message the message + * @param headerName the header name + * @param type the type + * @return the header value + * @throws TypeConversionException is thrown if error during type conversion + * @throws NoSuchHeaderException is thrown if no headers exists + */ + public static <T> T getMandatoryHeader(Message message, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException { + T answer = message.getHeader(headerName, type); + if (answer == null) { + throw new NoSuchHeaderException(message.getExchange(), headerName, type); + } + return answer; + } + + /** * Gets an header or property of the correct type * * @param exchange the exchange http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/camel-core/src/main/java/org/apache/camel/util/function/ThrowingBiFunction.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/function/ThrowingBiFunction.java b/camel-core/src/main/java/org/apache/camel/util/function/ThrowingBiFunction.java new file mode 100644 index 0000000..6b8cedc --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/function/ThrowingBiFunction.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.function; + +@FunctionalInterface +public interface ThrowingBiFunction<I1, I2, R, T extends Throwable> { + R apply(I1 in1, I2 in2) throws T; +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-atomix/pom.xml b/components/camel-atomix/pom.xml index ca4ad29..0c4a8db 100644 --- a/components/camel-atomix/pom.xml +++ b/components/camel-atomix/pom.xml @@ -38,9 +38,10 @@ org.apache.camel.component.atomix.*, org.apache.camel.component.atomix.client.*, org.apache.camel.component.atomix.cluster.* + org.apache.camel.component.atomix.ha.* </camel.osgi.export.pkg> <camel.osgi.export.service> - org.apache.camel.spi.ComponentResolver;component=atomix-client, + org.apache.camel.spi.ComponentResolver;component=atomix-map, org.apache.camel.spi.ComponentResolver;component=atomix-cluster, </camel.osgi.export.service> </properties> @@ -56,9 +57,19 @@ <!-- atomix --> <dependency> <groupId>io.atomix</groupId> - <artifactId>atomix-all</artifactId> + <artifactId>atomix</artifactId> <version>${atomix-version}</version> </dependency> + <dependency> + <groupId>io.atomix</groupId> + <artifactId>atomix-groups</artifactId> + <version>${atomix-version}</version> + </dependency> + <dependency> + <groupId>io.atomix.catalyst</groupId> + <artifactId>catalyst-netty</artifactId> + <version>${atomix-catalyst-version}</version> + </dependency> <!-- logging --> <dependency> @@ -88,6 +99,12 @@ <artifactId>camel-test-spring</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>java-hamcrest</artifactId> + <version>${hamcrest-version}</version> + <scope>test</scope> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/docs/atomix-client-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/docs/atomix-client-component.adoc b/components/camel-atomix/src/main/docs/atomix-client-component.adoc deleted file mode 100644 index 20ece42..0000000 --- a/components/camel-atomix/src/main/docs/atomix-client-component.adoc +++ /dev/null @@ -1,6 +0,0 @@ -## Atomix Client Component - -*Available as of Camel version 2.20* - -The camel atomix component allows you to work with Atomix, a fault-tolerant distributed coordination framework. - http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/docs/atomix-map-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/docs/atomix-map-component.adoc b/components/camel-atomix/src/main/docs/atomix-map-component.adoc new file mode 100644 index 0000000..653057e --- /dev/null +++ b/components/camel-atomix/src/main/docs/atomix-map-component.adoc @@ -0,0 +1,45 @@ +## atomix-map Component + +*Available as of Camel version 2.20* + +The camel atomix component allows you to work with Atomix, a fault-tolerant distributed coordination framework. + +// component options: START +The atomix-map component supports 5 options which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|======================================================================= +| Name | Description | Default | Type +| **configuration** (advanced) | The shared component configuration | | C +| **atomix** (producer) | The shared AtomixClient instance | | AtomixClient +| **nodes** (producer) | The nodes the AtomixClient should connect to | | List +| **configurationUri** (producer) | The path to the AtomixClient configuration | | String +| **resolveProperty Placeholders** (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean +|======================================================================= +// component options: END + +// endpoint options: START +The atomix-map endpoint is configured using URI syntax: + + atomix-map:mapName + +with the following path and query parameters: + +#### Path Parameters (1 parameters): + +[width="100%",cols="2,5,^1,2",options="header"] +|======================================================================= +| Name | Description | Default | Type +| **mapName** | *Required* The distributed map name | | String +|======================================================================= + +#### Query Parameters (1 parameters): + +[width="100%",cols="2,5,^1,2",options="header"] +|======================================================================= +| Name | Description | Default | Type +| **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean +|======================================================================= +// endpoint options: END http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixAsyncMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixAsyncMessageProcessor.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixAsyncMessageProcessor.java new file mode 100644 index 0000000..bfa5e95 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixAsyncMessageProcessor.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Message; + +@FunctionalInterface +public interface AtomixAsyncMessageProcessor { + /** + * @param message + * @param callback + * @return + * @throws Exception + */ + boolean process(Message message, AsyncCallback callback) throws Exception; +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java index d0e3203..1ffc06d 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java @@ -21,13 +21,18 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import io.atomix.Atomix; import io.atomix.catalyst.transport.Address; import io.atomix.catalyst.transport.Transport; import io.atomix.catalyst.transport.netty.NettyTransport; +import io.atomix.resource.ReadConsistency; import org.apache.camel.spi.UriParam; import org.apache.camel.util.ObjectHelper; -public class AtomixConfiguration { +public class AtomixConfiguration<T extends Atomix> implements Cloneable { + @UriParam + private T atomix; + @UriParam(javaType = "java.lang.String") private List<Address> nodes = Collections.emptyList(); @@ -37,9 +42,27 @@ public class AtomixConfiguration { @UriParam private String configurationUri; + @UriParam(label = "advanced") + private ReadConsistency readConsistency; + protected AtomixConfiguration() { } + // ***************************************** + // Properties + // ***************************************** + + public T getAtomix() { + return atomix; + } + + /** + * The Atomix instance to use + */ + public void setAtomix(T client) { + this.atomix = client; + } + public List<Address> getNodes() { return nodes; } @@ -78,4 +101,15 @@ public class AtomixConfiguration { public void setConfigurationUri(String configurationUri) { this.configurationUri = configurationUri; } + + public ReadConsistency getReadConsistency() { + return readConsistency; + } + + /** + * The read consistency level. + */ + public void setReadConsistency(ReadConsistency readConsistency) { + this.readConsistency = readConsistency; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixHelper.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixHelper.java new file mode 100644 index 0000000..959c894 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixHelper.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix; + +import io.atomix.copycat.server.storage.Storage; +import io.atomix.copycat.server.storage.StorageLevel; + +public final class AtomixHelper { + private AtomixHelper() { + } + + public static Storage inMemoryStorage() { + return Storage.builder().withStorageLevel(StorageLevel.MEMORY).build(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAsyncAtomixClientProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAsyncAtomixClientProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAsyncAtomixClientProducer.java new file mode 100644 index 0000000..931e9e4 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAsyncAtomixClientProducer.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Repeatable; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.atomix.AtomixAsyncMessageProcessor; +import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.AsyncProcessorHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractAsyncAtomixClientProducer<E extends AbstractAtomixClientEndpoint> extends DefaultProducer implements AsyncProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAsyncAtomixClientProducer.class); + private final Map<AtomixClientAction, AtomixAsyncMessageProcessor> processors; + + protected AbstractAsyncAtomixClientProducer(E endpoint) { + super(endpoint); + + this.processors = new HashMap<>(); + } + + @Override + protected void doStart() throws Exception { + for (final Method method : getClass().getDeclaredMethods()) { + AsyncInvokeOnHeaders annotations = method.getAnnotation(AsyncInvokeOnHeaders.class); + if (annotations != null) { + for (AsyncInvokeOnHeader annotation : annotations.value()) { + bind(annotation, method); + } + } else { + AsyncInvokeOnHeader annotation = method.getAnnotation(AsyncInvokeOnHeader.class); + if (annotation != null) { + bind(annotation, method); + } + } + } + + super.doStart(); + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + final Message message = exchange.getIn(); + final AtomixClientAction action = getAction(message); + + AtomixAsyncMessageProcessor processor = this.processors.get(action); + if (processor != null) { + try { + return processor.process(message, callback); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + } else { + throw new RuntimeCamelException("No handler for action " + action); + } + } + + @SuppressWarnings("unchecked") + protected E getAtomixEndpoint() { + return (E)super.getEndpoint(); + } + + protected abstract AtomixClientAction getAction(Message message); + + // ************************************ + // Binding helpers + // ************************************ + + private void bind(AsyncInvokeOnHeader annotation, final Method method) { + if (method.getParameterCount() == 2) { + method.setAccessible(true); + + if (!Message.class.isAssignableFrom(method.getParameterTypes()[0])) { + throw new IllegalArgumentException("First argument should be of type Message"); + } + if (!AsyncCallback.class.isAssignableFrom(method.getParameterTypes()[1])) { + throw new IllegalArgumentException("Second argument should be of type AsyncCallback"); + } + + LOGGER.debug("bind key={}, class={}, method={}", + annotation.value(), this.getClass(), method.getName()); + + this.processors.put(annotation.value(), (m, c) -> (boolean)method.invoke(this, m, c)); + } else { + throw new IllegalArgumentException( + "Illegal number of parameters for method: " + method.getName() + ", required: 2, found: " + method.getParameterCount() + ); + } + } + + // ************************************ + // Annotations + // ************************************ + + @Repeatable(AsyncInvokeOnHeaders.class) + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface AsyncInvokeOnHeader { + AtomixClientAction value(); + } + + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface AsyncInvokeOnHeaders { + AsyncInvokeOnHeader[] value(); + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientComponent.java new file mode 100644 index 0000000..66fd91b --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientComponent.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client; + +import java.util.List; + +import io.atomix.AtomixClient; +import io.atomix.catalyst.transport.Address; +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.spi.Metadata; + +public abstract class AbstractAtomixClientComponent<C extends AtomixClientConfiguration> extends DefaultComponent { + @Metadata(label = "advanced") + private C configuration; + + protected AbstractAtomixClientComponent(C configuration) { + this.configuration = configuration; + } + + protected AbstractAtomixClientComponent(CamelContext camelContext, C configuration) { + super(camelContext); + + this.configuration = configuration; + } + + // ***************************************** + // Properties + // ***************************************** + + public C getConfiguration() { + return configuration; + } + + /** + * The shared component configuration + */ + public void setConfiguration(C configuration) { + this.configuration = configuration; + } + + public AtomixClient getAtomix() { + return configuration.getAtomix(); + } + + /** + * The shared AtomixClient instance + */ + public void setAtomix(AtomixClient client) { + configuration.setAtomix(client); + } + + public List<Address> getNodes() { + return configuration.getNodes(); + } + + /** + * The nodes the AtomixClient should connect to + */ + public void setNodes(List<Address> nodes) { + configuration.setNodes(nodes); + } + + public void setNodes(String nodes) { + configuration.setNodes(nodes); + } + + public String getConfigurationUri() { + return configuration.getConfigurationUri(); + } + + + /** + * The path to the AtomixClient configuration + */ + public void setConfigurationUri(String configurationUri) { + configuration.setConfigurationUri(configurationUri); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientConsumer.java new file mode 100644 index 0000000..8f800e9 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientConsumer.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client; + +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultConsumer; + +public abstract class AbstractAtomixClientConsumer<E extends AbstractAtomixClientEndpoint> extends DefaultConsumer { + protected AbstractAtomixClientConsumer(E endpoint, Processor processor) { + super(endpoint, processor); + } + + @SuppressWarnings("unchecked") + protected E getAtomixEndpoint() { + return (E)super.getEndpoint(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientEndpoint.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientEndpoint.java new file mode 100644 index 0000000..5a83873 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientEndpoint.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client; + +import io.atomix.AtomixClient; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; + + +public abstract class AbstractAtomixClientEndpoint<T extends AbstractAtomixClientComponent, C extends AtomixClientConfiguration> extends DefaultEndpoint { + private final C configuration; + private AtomixClient atomix; + + protected AbstractAtomixClientEndpoint(String uri, T component, C configuration) { + super(uri, component); + + this.configuration = configuration; + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public Producer createProducer() throws Exception { + throw new UnsupportedOperationException("Producer not supported"); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("Consumer not supported"); + } + + @Override + protected void doStart() throws Exception { + if (atomix == null) { + atomix = AtomixClientHelper.createClient(getCamelContext(), configuration); + atomix.connect(configuration.getNodes()).join(); + } + + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + if (atomix != null) { + atomix.close(); + } + } + + // ********************************** + // Helpers for implementations + // ********************************* + + @SuppressWarnings("unchecked") + public T getAtomixComponent() { + return (T)super.getComponent(); + } + + public C getAtomixConfiguration() { + return this.configuration; + } + + public AtomixClient getAtomix() { + return atomix; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java new file mode 100644 index 0000000..16a7866 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client; + +import org.apache.camel.impl.HeaderSelectorProducer; + +public abstract class AbstractAtomixClientProducer<E extends AbstractAtomixClientEndpoint> extends HeaderSelectorProducer { + protected AbstractAtomixClientProducer(E endpoint, String header) { + super(endpoint, header); + } + + @SuppressWarnings("unchecked") + protected E getAtomixEndpoint() { + return (E)super.getEndpoint(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientAction.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientAction.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientAction.java new file mode 100644 index 0000000..5bf641f --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientAction.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client; + +public enum AtomixClientAction { + PUT, + PUT_IF_ABSENT, + GET, + CLEAR, + SIZE, + CONTAINS_KEY, + CONTAINS_VALUE, + IS_EMPTY, + ENTRY_SET, + REMOVE, + REPLACE, + VALUES +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java deleted file mode 100644 index 775315b..0000000 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.atomix.client; - -import java.util.Map; - -import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; -import org.apache.camel.impl.DefaultComponent; - -public class AtomixClientComponent extends DefaultComponent { - public AtomixClientComponent() { - } - - public AtomixClientComponent(CamelContext camelContext) { - super(camelContext); - } - - @Override - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - throw new UnsupportedOperationException("Not yet implemented"); - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java index d3134c4..0b8a0d8 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java @@ -16,11 +16,8 @@ */ package org.apache.camel.component.atomix.client; +import io.atomix.AtomixClient; import org.apache.camel.component.atomix.AtomixConfiguration; -import org.apache.camel.spi.UriParams; -@UriParams -public class AtomixClientConfiguration extends AtomixConfiguration { - public AtomixClientConfiguration() { - } +public class AtomixClientConfiguration extends AtomixConfiguration<AtomixClient> implements Cloneable { } http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConstants.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConstants.java new file mode 100644 index 0000000..2fccd8e --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConstants.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client; + +public final class AtomixClientConstants { + public static final String RESOURCE_NAME = "CamelAtomixResourceName"; + public static final String RESOURCE_ACTION = "CamelAtomixResourceAction"; + public static final String RESOURCE_KEY = "CamelAtomixResourceKey"; + public static final String RESOURCE_VALUE = "CamelAtomixResourceValue"; + public static final String RESOURCE_DEFAULT_VALUE = "CamelAtomixResourceDefaultValue"; + public static final String RESOURCE_OLD_VALUE = "CamelAtomixResourceOldValue"; + public static final String RESOURCE_ACTION_HAS_RESULT = "CamelAtomixResourceActionHasResult"; + public static final String RESOURCE_TTL = "CamelAtomixResourceTTL"; + public static final String RESOURCE_READ_CONSISTENCY = "CamelAtomixResourceReadConsistency"; + public static final String EVENT_TYPE = "CamelAtomixEventType"; + + private AtomixClientConstants() { + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientHelper.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientHelper.java new file mode 100644 index 0000000..bbe9358 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientHelper.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client; + +import java.io.InputStream; +import java.util.Properties; + +import io.atomix.AtomixClient; +import org.apache.camel.CamelContext; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ResourceHelper; + +public final class AtomixClientHelper { + private AtomixClientHelper() { + } + + public static AtomixClient createClient(CamelContext camelContext, AtomixClientConfiguration configuration) throws Exception { + AtomixClient atomix = configuration.getAtomix(); + + if (atomix == null) { + final AtomixClient.Builder atomixBuilder; + + String uri = configuration.getConfigurationUri(); + if (ObjectHelper.isNotEmpty(uri)) { + uri = camelContext.resolvePropertyPlaceholders(uri); + try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) { + Properties properties = new Properties(); + properties.load(is); + + atomixBuilder = AtomixClient.builder(properties); + } + } else { + atomixBuilder = AtomixClient.builder(); + } + + if (configuration.getTransport() != null) { + atomixBuilder.withTransport( + camelContext.getInjector().newInstance(configuration.getTransport()) + ); + } + + atomix = atomixBuilder.build(); + } + + return atomix; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapComponent.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapComponent.java new file mode 100644 index 0000000..a853e7d --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapComponent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent; + +public class AtomixClientMapComponent extends AbstractAtomixClientComponent<AtomixClientMapConfiguration> { + public AtomixClientMapComponent() { + super(new AtomixClientMapConfiguration()); + } + + public AtomixClientMapComponent(CamelContext camelContext) { + super(camelContext, new AtomixClientMapConfiguration()); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + AtomixClientMapConfiguration configuration = super.getConfiguration().copy(); + setProperties(configuration, parameters); + + return new AtomixClientMapEndpoint(uri, this, configuration, remaining); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConfiguration.java new file mode 100644 index 0000000..f7f50a2 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConfiguration.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import io.atomix.collections.DistributedMap; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.atomix.client.AtomixClientAction; +import org.apache.camel.component.atomix.client.AtomixClientConfiguration; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; + +@UriParams +public class AtomixClientMapConfiguration extends AtomixClientConfiguration { + @UriParam(defaultValue = "PUT") + private AtomixClientAction defaultAction = AtomixClientAction.PUT; + @UriParam + private Long ttl; + @UriParam + private String resultHeader; + @UriParam(label = "advanced") + private DistributedMap.Config config = new DistributedMap.Config(); + @UriParam(label = "advanced") + private DistributedMap.Options options = new DistributedMap.Options(); + + // **************************************** + // Properties + // **************************************** + + public AtomixClientAction getDefaultAction() { + return defaultAction; + } + + /** + * The default action. + */ + public void setDefaultAction(AtomixClientAction defaultAction) { + this.defaultAction = defaultAction; + } + + public Long getTtl() { + return ttl; + } + + /** + * The resource ttl. + */ + public void setTtl(Long ttl) { + this.ttl = ttl; + } + + public String getResultHeader() { + return resultHeader; + } + + /** + * The header that wil carry the result. + */ + public void setResultHeader(String resultHeader) { + this.resultHeader = resultHeader; + } + + public DistributedMap.Config getConfig() { + return config; + } + + /** + * The cluster wide map config + */ + public void setConfig(DistributedMap.Config config) { + this.config = config; + } + + public DistributedMap.Options getOptions() { + return options; + } + + /** + * The local map options + */ + public void setOptions(DistributedMap.Options options) { + this.options = options; + } + + // **************************************** + // Copy + // **************************************** + + public AtomixClientMapConfiguration copy() { + try { + return (AtomixClientMapConfiguration) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeCamelException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumer.java new file mode 100644 index 0000000..6e7b410 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumer.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import java.util.ArrayList; +import java.util.List; + +import io.atomix.catalyst.concurrent.Listener; +import io.atomix.collections.DistributedMap; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer; +import org.apache.camel.component.atomix.client.AtomixClientConstants; + +public class AtomixClientMapConsumer extends AbstractAtomixClientConsumer<AtomixClientMapEndpoint> { + private final List<Listener<DistributedMap.EntryEvent<Object, Object>>> listeners; + private DistributedMap<Object, Object> map; + + public AtomixClientMapConsumer(AtomixClientMapEndpoint endpoint, Processor processor) { + super(endpoint, processor); + this.listeners = new ArrayList<>(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + this.map = getAtomixEndpoint() + .getAtomix() + .getMap( + getAtomixEndpoint().getMapName(), + getAtomixEndpoint().getAtomixConfiguration().getConfig(), + getAtomixEndpoint().getAtomixConfiguration().getOptions()) + .join(); + + this.listeners.add(this.map.onAdd(this::onEvent).join()); + this.listeners.add(this.map.onRemove(this::onEvent).join()); + this.listeners.add(this.map.onUpdate(this::onEvent).join()); + } + + @Override + protected void doStop() throws Exception { + // close listeners + listeners.forEach(Listener::close); + + // close the map + if (this.map == null) { + this.map.close().join(); + this.map = null; + } + + super.doStart(); + } + + // ******************************************** + // Event handler + // ******************************************** + + private void onEvent(DistributedMap.EntryEvent<Object, Object> event) { + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, event.type()); + exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_KEY, event.entry().getKey()); + exchange.getIn().setBody(event.entry().getValue()); + + try { + getProcessor().process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapEndpoint.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapEndpoint.java new file mode 100644 index 0000000..e8525a0 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapEndpoint.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import org.apache.camel.Producer; +import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriPath; + +@UriEndpoint(firstVersion = "2.20.0", scheme = "atomix-map", title = "atomix-map", syntax = "atomix-map:mapName", producerOnly = true, label = "clustering") +class AtomixClientMapEndpoint extends AbstractAtomixClientEndpoint<AtomixClientMapComponent, AtomixClientMapConfiguration> { + + @UriPath(description = "The distributed map name") + @Metadata(required = "true") + private final String mapName; + + public AtomixClientMapEndpoint(String uri, AtomixClientMapComponent component, AtomixClientMapConfiguration configuration, String mapName) { + super(uri, component, configuration); + + this.mapName = mapName; + } + + @Override + public Producer createProducer() throws Exception { + return new AtomixClientMapProducer(this, mapName); + } + + /* + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new AtomixClientMapConsumer(this, processor); + } + */ + + public String getMapName() { + return mapName; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java new file mode 100644 index 0000000..70dc656 --- /dev/null +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atomix.client.map; + +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import io.atomix.collections.DistributedMap; +import io.atomix.resource.ReadConsistency; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Message; +import org.apache.camel.component.atomix.client.AbstractAsyncAtomixClientProducer; +import org.apache.camel.component.atomix.client.AtomixClientAction; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; + +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_DEFAULT_VALUE; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_OLD_VALUE; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL; +import static org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE; + +final class AtomixClientMapProducer extends AbstractAsyncAtomixClientProducer<AtomixClientMapEndpoint> { + private final String mapName; + private final ConcurrentMap<String, DistributedMap<Object, Object>> maps; + private final AtomixClientMapConfiguration configuration; + + protected AtomixClientMapProducer(AtomixClientMapEndpoint endpoint, String mapName) { + super(endpoint); + this.mapName = ObjectHelper.notNull(mapName, "map name"); + this.configuration = endpoint.getAtomixConfiguration(); + this.maps = new ConcurrentHashMap<>(); + } + + @Override + protected AtomixClientAction getAction(Message message) { + return message.getHeader(RESOURCE_ACTION, configuration::getDefaultAction, AtomixClientAction.class); + } + + // ********************************* + // Handlers + // ********************************* + + @AsyncInvokeOnHeader(AtomixClientAction.PUT) + boolean onPut(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final Object key = ExchangeHelper.getMandatoryHeader(message, RESOURCE_KEY, Object.class); + final Duration ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, Duration.class); + + if (ttl != null) { + map.put( + key, + message.getMandatoryBody(), + ttl + ).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.put( + key, + message.getMandatoryBody() + ).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.PUT_IF_ABSENT) + boolean onPutIfAbsent(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final Object key = ExchangeHelper.getMandatoryHeader(message, RESOURCE_KEY, Object.class); + final Duration ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, Duration.class); + + if (ttl != null) { + map.putIfAbsent( + key, + message.getMandatoryBody(), + ttl + ).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.putIfAbsent( + key, + message.getMandatoryBody() + ).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.GET) + boolean onGet(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final Object key = ExchangeHelper.getMandatoryHeader(message, RESOURCE_KEY, Object.class); + final Object defaultValue = message.getHeader(RESOURCE_DEFAULT_VALUE); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + if (defaultValue != null) { + map.getOrDefault(key, defaultValue, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.get(key, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } + } else { + if (defaultValue != null) { + map.getOrDefault(key, defaultValue).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.get(key).thenAccept( + result -> processResult(message, callback, result) + ); + } + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.CLEAR) + boolean onClear(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + + map.clear().thenAccept( + result -> processResult(message, callback, result) + ); + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.SIZE) + boolean onSize(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + map.size(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.size().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.IS_EMPTY) + boolean onIsEmpty(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + map.isEmpty(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.isEmpty().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.ENTRY_SET) + boolean onEntrySet(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + map.entrySet(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.entrySet().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.VALUES) + boolean onValues(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + + if (consistency != null) { + map.values(consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.values().thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.CONTAINS_KEY) + boolean onContainsKey(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); + + ObjectHelper.notNull(key, RESOURCE_KEY); + + if (consistency != null) { + map.containsKey(key, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.containsKey(key).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.CONTAINS_VALUE) + boolean onContainsValue(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final ReadConsistency consistency = message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, ReadConsistency.class); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(value, RESOURCE_VALUE); + + if (consistency != null) { + map.containsValue(value, consistency).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.containsValue(value).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.REMOVE) + boolean onRemove(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); + final Object value = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + + ObjectHelper.notNull(key, RESOURCE_VALUE); + + if (value != null) { + map.remove(key, value).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.remove(key).thenAccept( + result -> processResult(message, callback, result) + ); + } + + return false; + } + + @AsyncInvokeOnHeader(AtomixClientAction.REPLACE) + boolean onReplace(Message message, AsyncCallback callback) throws Exception { + final DistributedMap<Object, Object> map = getMap(message); + final Duration ttl = message.getHeader(RESOURCE_TTL, configuration::getTtl, Duration.class); + final Object key = message.getHeader(RESOURCE_KEY, message::getBody, Object.class); + final Object newValue = message.getHeader(RESOURCE_VALUE, message::getBody, Object.class); + final Object oldValue = message.getHeader(RESOURCE_OLD_VALUE, Object.class); + + ObjectHelper.notNull(key, RESOURCE_VALUE); + ObjectHelper.notNull(newValue, RESOURCE_VALUE); + + if (ttl != null) { + if (oldValue != null) { + map.replace(key, oldValue, newValue, ttl).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.replace(key, newValue, ttl).thenAccept( + result -> processResult(message, callback, result) + ); + } + } else { + if (oldValue != null) { + map.replace(key, oldValue, newValue).thenAccept( + result -> processResult(message, callback, result) + ); + } else { + map.replace(key, newValue).thenAccept( + result -> processResult(message, callback, result) + ); + } + } + + return false; + } + + // ********************************* + // Helpers + // ********************************* + + private void processResult(Message message, AsyncCallback callback, Object result) { + if (result != null && !(result instanceof Void)) { + message.setHeader(RESOURCE_ACTION_HAS_RESULT, true); + + String resultHeader = configuration.getResultHeader(); + if (resultHeader != null) { + message.setHeader(resultHeader, result); + } else { + message.setBody(result); + } + } else { + message.setHeader(RESOURCE_ACTION_HAS_RESULT, false); + } + + callback.done(false); + } + + private DistributedMap<Object, Object> getMap(Message message) { + return maps.computeIfAbsent( + message.getHeader(RESOURCE_NAME, getAtomixEndpoint().getMapName(), String.class), + name -> { + return getAtomixEndpoint() + .getAtomix() + .getMap( + name, + getAtomixEndpoint().getAtomixConfiguration().getConfig(), + getAtomixEndpoint().getAtomixConfiguration().getOptions()) + .join(); + } + ); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java index 8a732e1..b05230f 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.atomix.cluster; import io.atomix.AtomixReplica; +import io.atomix.catalyst.transport.Transport; import io.atomix.copycat.server.storage.StorageLevel; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.atomix.AtomixConfiguration; @@ -24,65 +25,70 @@ import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; @UriParams -public class AtomixClusterConfiguration extends AtomixConfiguration implements Cloneable { +public class AtomixClusterConfiguration extends AtomixConfiguration<AtomixReplica> implements Cloneable { @UriParam - private String storagePath; - - @UriParam(defaultValue = "MEMORY") - private StorageLevel storageLevel = StorageLevel.MEMORY; + private Class<? extends Transport> clientTransport; @UriParam - private String replicaRef; + private Class<? extends Transport> serverTransport; @UriParam - private AtomixReplica replica; + private String storagePath; + + @UriParam(defaultValue = "MEMORY") + private StorageLevel storageLevel = StorageLevel.MEMORY; public AtomixClusterConfiguration() { } - public String getStoragePath() { - return storagePath; + // ****************************************** + // Properties + // ****************************************** + + + public Class<? extends Transport> getClientTransport() { + return clientTransport; } /** - * Sets the log directory. + * The client transport */ - public void setStoragePath(String storagePath) { - this.storagePath = storagePath; + public void setClientTransport(Class<? extends Transport> clientTransport) { + this.clientTransport = clientTransport; } - public StorageLevel getStorageLevel() { - return storageLevel; + public Class<? extends Transport> getServerTransport() { + return serverTransport; } /** - * Sets the log storage level. + * The server transport */ - public void setStorageLevel(StorageLevel storageLevel) { - this.storageLevel = storageLevel; + public void setServerTransport(Class<? extends Transport> serverTransport) { + this.serverTransport = serverTransport; } - public String getReplicaRef() { - return replicaRef; + public String getStoragePath() { + return storagePath; } /** - * Set the reference of an instance of {@link AtomixReplica}. + * Sets the log directory. */ - public void setReplicaRef(String clusterref) { - this.replicaRef = clusterref; + public void setStoragePath(String storagePath) { + this.storagePath = storagePath; } - public AtomixReplica getReplica() { - return replica; + public StorageLevel getStorageLevel() { + return storageLevel; } /** - * Set an instance of {@link AtomixReplica}. + * Sets the log storage level. */ - public void setReplica(AtomixReplica replica) { - this.replica = replica; + public void setStorageLevel(StorageLevel storageLevel) { + this.storageLevel = storageLevel; } // **************************************** http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java index 777a4f6..0530495 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java @@ -23,7 +23,6 @@ import io.atomix.AtomixReplica; import io.atomix.catalyst.transport.Address; import io.atomix.copycat.server.storage.Storage; import org.apache.camel.CamelContext; -import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ResourceHelper; @@ -36,41 +35,47 @@ public final class AtomixClusterHelper { } public static AtomixReplica createReplica(CamelContext camelContext, Address address, AtomixClusterConfiguration configuration) throws Exception { - AtomixReplica atomix = configuration.getReplica(); + AtomixReplica atomix = configuration.getAtomix(); if (atomix == null) { - if (configuration.getReplicaRef() != null) { - atomix = CamelContextHelper.mandatoryLookup(camelContext, configuration.getReplicaRef(), AtomixReplica.class); - } else { - final AtomixReplica.Builder atomixBuilder; + final AtomixReplica.Builder atomixBuilder; - String uri = configuration.getConfigurationUri(); - if (ObjectHelper.isNotEmpty(uri)) { - uri = camelContext.resolvePropertyPlaceholders(uri); - try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) { - Properties properties = new Properties(); - properties.load(is); + String uri = configuration.getConfigurationUri(); + if (ObjectHelper.isNotEmpty(uri)) { + uri = camelContext.resolvePropertyPlaceholders(uri); + try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) { + Properties properties = new Properties(); + properties.load(is); - atomixBuilder = AtomixReplica.builder(address, properties); - } - } else { - atomixBuilder = AtomixReplica.builder(address); + atomixBuilder = AtomixReplica.builder(address, properties); } + } else { + atomixBuilder = AtomixReplica.builder(address); + } - Storage.Builder storageBuilder = Storage.builder(); - ObjectHelper.ifNotEmpty(configuration.getStorageLevel(), storageBuilder::withStorageLevel); - ObjectHelper.ifNotEmpty(configuration.getStoragePath(), storageBuilder::withDirectory); - - atomixBuilder.withStorage(storageBuilder.build()); + Storage.Builder storageBuilder = Storage.builder(); + ObjectHelper.ifNotEmpty(configuration.getStorageLevel(), storageBuilder::withStorageLevel); + ObjectHelper.ifNotEmpty(configuration.getStoragePath(), storageBuilder::withDirectory); - if (configuration.getTransport() != null) { - atomixBuilder.withTransport( - camelContext.getInjector().newInstance(configuration.getTransport()) - ); - } + atomixBuilder.withStorage(storageBuilder.build()); - atomix = atomixBuilder.build(); + if (configuration.getTransport() != null) { + atomixBuilder.withTransport( + camelContext.getInjector().newInstance(configuration.getTransport()) + ); + } + if (configuration.getClientTransport() != null) { + atomixBuilder.withClientTransport( + camelContext.getInjector().newInstance(configuration.getClientTransport()) + ); } + if (configuration.getServerTransport() != null) { + atomixBuilder.withServerTransport( + camelContext.getInjector().newInstance(configuration.getServerTransport()) + ); + } + + atomix = atomixBuilder.build(); } return atomix; http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java ---------------------------------------------------------------------- diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java index 6a6a750..bf49eab 100644 --- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java +++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java @@ -18,7 +18,6 @@ package org.apache.camel.component.atomix.ha; import java.util.List; -import io.atomix.Atomix; import io.atomix.AtomixReplica; import io.atomix.catalyst.transport.Address; import io.atomix.catalyst.transport.Transport; @@ -109,20 +108,12 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom configuration.setTransport(transport); } - public String getReplicaRef() { - return configuration.getReplicaRef(); + public AtomixReplica getAtomix() { + return configuration.getAtomix(); } - public void setReplicaRef(String clusterref) { - configuration.setReplicaRef(clusterref); - } - - public Atomix getReplica() { - return configuration.getReplica(); - } - - public void setReplica(AtomixReplica replica) { - configuration.setReplica(replica); + public void setAtomix(AtomixReplica atomix) { + configuration.setAtomix(atomix); } public String getConfigurationUri() { @@ -139,23 +130,18 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom @Override protected void doStart() throws Exception { - // Assume that if addresses are provided the cluster needs be bootstrapped. - if (ObjectHelper.isNotEmpty(configuration.getNodes())) { - LOGGER.debug("Bootstrap cluster on address {} for nodes: {}", address, configuration.getNodes()); - getOrCreateAtomix().bootstrap(configuration.getNodes()).join(); - LOGGER.debug("Bootstrap cluster done"); - } + // instantiate a new atomix replica + getOrCreateReplica(); super.doStart(); } @Override protected AtomixClusterView createView(String namespace) throws Exception { - return new AtomixClusterView(this, namespace, getOrCreateAtomix()); + return new AtomixClusterView(this, namespace, getOrCreateReplica()); } - - private AtomixReplica getOrCreateAtomix() throws Exception { + private AtomixReplica getOrCreateReplica() throws Exception { if (atomix == null) { // Validate parameters ObjectHelper.notNull(getCamelContext(), "Camel Context");