Repository: camel Updated Branches: refs/heads/master 50bd595e9 -> 09fb6426d
CAMEL-10986: camel-zookeeper-master - Donation of the master component from fabric8 v1 Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/09fb6426 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/09fb6426 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/09fb6426 Branch: refs/heads/master Commit: 09fb6426d25ec9e54ba240d3af825454aa1a9216 Parents: 50bd595 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Mar 11 10:14:49 2017 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Mar 11 10:26:43 2017 +0100 ---------------------------------------------------------------------- .../main/docs/zookeeper-master-component.adoc | 41 +++- .../zookeepermaster/CamelNodeState.java | 6 + .../zookeepermaster/ContainerIdFactory.java | 29 +++ .../DefaultContainerIdFactory.java | 27 +++ .../zookeepermaster/MasterComponent.java | 15 +- .../zookeepermaster/MasterConsumer.java | 85 ++++++--- .../zookeepermaster/MasterEndpoint.java | 57 +++--- .../zookeepermaster/ZKComponentSupport.java | 185 ------------------- .../ZookeeperComponentSupport.java | 185 +++++++++++++++++++ .../zookeepermaster/MasterEndpointTest.java | 2 +- components/readme.adoc | 2 +- .../MasterComponentConfiguration.java | 14 ++ .../camel-spring-boot-dependencies/pom.xml | 5 + 13 files changed, 412 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/main/docs/zookeeper-master-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/main/docs/zookeeper-master-component.adoc b/components/camel-zookeeper-master/src/main/docs/zookeeper-master-component.adoc index be3530f..b115b4c 100644 --- a/components/camel-zookeeper-master/src/main/docs/zookeeper-master-component.adoc +++ b/components/camel-zookeeper-master/src/main/docs/zookeeper-master-component.adoc @@ -33,13 +33,14 @@ Where endpoint is any Camel endpoint you want to run in master/slave mode. ### Options // component options: START -The ZooKeeper Master component supports 6 options which are listed below. +The ZooKeeper Master component supports 7 options which are listed below. [width="100%",cols="2,1,1m,1m,5",options="header"] |======================================================================= | Name | Group | Default | Java Type | Description +| containerIdFactory | consumer | | ContainerIdFactory | To use a custom ContainerIdFactory for creating container ids. | zkRoot | consumer | /camel/zookeepermaster/clusters/master | String | The root path to use in zookeeper where information is stored which nodes are master/slave etc. Will by default use: /camel/zookeepermaster/clusters/master | curator | advanced | | CuratorFramework | To use a custom configured CuratorFramework as connection to zookeeper ensemble. | maximumConnectionTimeout | consumer | 10000 | int | Timeout in millis to use when connecting to the zookeeper ensemble @@ -52,7 +53,7 @@ The ZooKeeper Master component supports 6 options which are listed below. // endpoint options: START The ZooKeeper Master endpoint is configured using URI syntax: - zookeeper-master:name:endpoint + zookeeper-master:groupName:consumerEndpointUri with the following path and query parameters: @@ -61,8 +62,8 @@ with the following path and query parameters: [width="100%",cols="2,1,1m,6",options="header"] |======================================================================= | Name | Default | Java Type | Description -| name | | String | *Required* The name of the cluster group to use -| endpoint | | String | *Required* The Camel endpoint to use in master/slave mode +| groupName | | String | *Required* The name of the cluster group to use +| consumerEndpointUri | | String | *Required* The consumer endpoint to use in master/slave mode |======================================================================= #### Query Parameters (4 parameters): @@ -77,6 +78,38 @@ with the following path and query parameters: |======================================================================= // endpoint options: END +### Example + +You can protect a clustered Camel application to only consume files from one active node. + + +[source,java] +---- + // the file endpoint we want to consume from + String url = "file:target/inbox?delete=true"; + + // use the zookeeper master component in the clustered group named myGroup + // to run a master/slave mode in the following Camel url + from("zookeeper-master:myGroup:" + url) + .log(name + " - Received file: ${file:name}") + .delay(delay) + .log(name + " - Done file: ${file:name}") + .to("file:target/outbox"); + } +---- + +ZooKeeper will by default connect to `localhost:2181`, but you can configure this on the component level. + +[source,java] +---- + MasterComponent master = new MasterComponent(); + master.setZooKeeperUrl("myzookeeper:2181"); +---- + +However you can also configure the url of the ZooKeeper ensemble using environment variables. + + export ZOOKEEPER_URL = "myzookeeper:2181" + ### See Also http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/CamelNodeState.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/CamelNodeState.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/CamelNodeState.java index d501e12..f8f2704 100644 --- a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/CamelNodeState.java +++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/CamelNodeState.java @@ -17,6 +17,8 @@ package org.apache.camel.component.zookeepermaster; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.camel.component.zookeepermaster.group.NodeState; public class CamelNodeState extends NodeState { @@ -34,4 +36,8 @@ public class CamelNodeState extends NodeState { super(id); } + public CamelNodeState(String id, String container) { + super(id, container); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ContainerIdFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ContainerIdFactory.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ContainerIdFactory.java new file mode 100644 index 0000000..c0ba659 --- /dev/null +++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ContainerIdFactory.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeepermaster; + +/** + * Factory to create container ids which are used in the ZooKeeper cluster group + * as unique ids among all the nodes that runs in the cluster. + * <p/> + * A custom factory can be used that uses hostname or some other unique way of identifying the container + */ +public interface ContainerIdFactory { + + String newContainerId(); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/DefaultContainerIdFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/DefaultContainerIdFactory.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/DefaultContainerIdFactory.java new file mode 100644 index 0000000..a3d5bca --- /dev/null +++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/DefaultContainerIdFactory.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeepermaster; + +import java.util.UUID; + +public class DefaultContainerIdFactory implements ContainerIdFactory { + + @Override + public String newContainerId() { + return System.getProperty("runtime.id", UUID.randomUUID().toString()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterComponent.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterComponent.java index f2d5d63..59749ae 100644 --- a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterComponent.java +++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterComponent.java @@ -28,11 +28,24 @@ import org.apache.camel.util.URISupport; * point in time with all other JVMs being hot standbys which wait until the master JVM dies before * taking over to provide high availability of a single consumer. */ -public class MasterComponent extends ZKComponentSupport { +public class MasterComponent extends ZookeeperComponentSupport { + + private ContainerIdFactory containerIdFactory = new DefaultContainerIdFactory(); @Metadata(defaultValue = "/camel/zookeepermaster/clusters/master") private String zkRoot = "/camel/zookeepermaster/clusters/master"; + public ContainerIdFactory getContainerIdFactory() { + return containerIdFactory; + } + + /** + * To use a custom ContainerIdFactory for creating container ids. + */ + public void setContainerIdFactory(ContainerIdFactory containerIdFactory) { + this.containerIdFactory = containerIdFactory; + } + public String getZkRoot() { return zkRoot; } http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterConsumer.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterConsumer.java index ab3608c..067d6a8 100644 --- a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterConsumer.java +++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterConsumer.java @@ -16,9 +16,15 @@ */ package org.apache.camel.component.zookeepermaster; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.SuspendableService; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.zookeepermaster.group.Group; import org.apache.camel.component.zookeepermaster.group.GroupListener; import org.apache.camel.impl.DefaultConsumer; @@ -29,6 +35,7 @@ import org.slf4j.LoggerFactory; /** * A consumer which is only really active while it holds the master lock */ +@ManagedResource(description = "Managed ZooKeeper Master Consumer") public class MasterConsumer extends DefaultConsumer implements GroupListener { private static final transient Logger LOG = LoggerFactory.getLogger(MasterConsumer.class); @@ -37,29 +44,58 @@ public class MasterConsumer extends DefaultConsumer implements GroupListener { private Consumer delegate; private SuspendableService delegateService; private final Group<CamelNodeState> singleton; + private volatile CamelNodeState thisNodeState; public MasterConsumer(MasterEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; MasterComponent component = endpoint.getComponent(); - String path = component.getCamelClusterPath(endpoint.getSingletonId()); + String path = component.getCamelClusterPath(endpoint.getGroupName()); this.singleton = component.createGroup(path); this.singleton.add(this); } + @ManagedAttribute(description = "Are we connected to ZooKeeper") + public boolean isConnected() { + return singleton.isConnected(); + } + + @ManagedAttribute(description = "Are we the master") + public boolean isMaster() { + return singleton.isMaster(); + } + + @ManagedOperation(description = "Information about all the slaves") + public String slaves() { + try { + return new ObjectMapper() + .enable(SerializationFeature.INDENT_OUTPUT) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + .writeValueAsString(singleton.slaves()); + } catch (Exception e) { + return null; + } + } + + @ManagedOperation(description = "Information about the last event in the cluster group") + public String lastEvent() { + Object event = singleton.getLastState(); + return event != null ? event.toString() : null; + } + + @ManagedOperation(description = "Information about this node") + public String thisNode() { + return thisNodeState != null ? thisNodeState.toString() : null; + } + @Override protected void doStart() throws Exception { super.doStart(); singleton.start(); - LOG.info("Attempting to become master for endpoint: " + endpoint + " in " + endpoint.getCamelContext() + " with singletonID: " + endpoint.getSingletonId()); - singleton.update(createNodeState()); - } - - private CamelNodeState createNodeState() { - CamelNodeState state = new CamelNodeState(endpoint.getSingletonId()); - state.consumer = endpoint.getChildEndpoint().getEndpointUri(); - return state; + LOG.info("Attempting to become master for endpoint: " + endpoint + " in " + endpoint.getCamelContext() + " with singletonID: " + endpoint.getGroupName()); + thisNodeState = createNodeState(); + singleton.update(thisNodeState); } @Override @@ -71,11 +107,19 @@ public class MasterConsumer extends DefaultConsumer implements GroupListener { } } + private CamelNodeState createNodeState() { + String containerId = endpoint.getComponent().getContainerIdFactory().newContainerId(); + CamelNodeState state = new CamelNodeState(endpoint.getGroupName(), containerId); + state.consumer = endpoint.getConsumerEndpoint().getEndpointUri(); + return state; + } + protected void stopConsumer() throws Exception { ServiceHelper.stopAndShutdownServices(delegate); - ServiceHelper.stopAndShutdownServices(endpoint.getChildEndpoint()); + ServiceHelper.stopAndShutdownServices(endpoint.getConsumerEndpoint()); delegate = null; delegateService = null; + thisNodeState = null; } @Override @@ -115,42 +159,41 @@ public class MasterConsumer extends DefaultConsumer implements GroupListener { break; case DISCONNECTED: try { - LOG.info("Disconnecting as Master. Stopping consumer: {}", endpoint.getChildEndpoint()); + LOG.info("Disconnecting as master. Stopping consumer: {}", endpoint.getConsumerEndpoint()); stopConsumer(); } catch (Exception e) { - LOG.error("Failed to stop master consumer for: " + endpoint + ". Reason: " + e, e); + LOG.warn("Failed to stop master consumer for: " + endpoint + ". This exception is ignored.", e); } break; default: // noop } - } protected void onLockOwned() { if (delegate == null) { try { // ensure endpoint is also started - LOG.info("Elected as master. Starting consumer: {}", endpoint.getChildEndpoint()); - ServiceHelper.startService(endpoint.getChildEndpoint()); + LOG.info("Elected as master. Starting consumer: {}", endpoint.getConsumerEndpoint()); + ServiceHelper.startService(endpoint.getConsumerEndpoint()); - delegate = endpoint.getChildEndpoint().createConsumer(processor); + delegate = endpoint.getConsumerEndpoint().createConsumer(processor); delegateService = null; if (delegate instanceof SuspendableService) { delegateService = (SuspendableService) delegate; } // Lets show we are starting the consumer. - CamelNodeState nodeState = createNodeState(); - nodeState.started = true; - singleton.update(nodeState); + thisNodeState = createNodeState(); + thisNodeState.started = true; + singleton.update(thisNodeState); ServiceHelper.startService(delegate); } catch (Exception e) { - LOG.error("Failed to start master consumer for: " + endpoint + ". Reason: " + e, e); + LOG.error("Failed to start master consumer for: " + endpoint, e); } - LOG.info("Elected as master. Consumer started: {}", endpoint.getChildEndpoint()); + LOG.info("Elected as master. Consumer started: {}", endpoint.getConsumerEndpoint()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterEndpoint.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterEndpoint.java index 6e034ed..2731576 100644 --- a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterEndpoint.java +++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/MasterEndpoint.java @@ -21,6 +21,8 @@ import org.apache.camel.DelegateEndpoint; import org.apache.camel.Endpoint; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; @@ -29,32 +31,46 @@ import org.apache.camel.spi.UriPath; /** * Represents an endpoint which only becomes active when it obtains the master lock */ -@UriEndpoint(firstVersion = "2.19.0", scheme = "zookeeper-master", syntax = "zookeeper-master:name:endpoint", consumerClass = MasterConsumer.class, consumerOnly = true, +@ManagedResource(description = "Managed ZooKeeper Master Endpoint") +@UriEndpoint(firstVersion = "2.19.0", scheme = "zookeeper-master", syntax = "zookeeper-master:groupName:consumerEndpointUri", consumerClass = MasterConsumer.class, consumerOnly = true, title = "ZooKeeper Master", lenientProperties = true, label = "clustering") public class MasterEndpoint extends DefaultEndpoint implements DelegateEndpoint { private final MasterComponent component; + private final Endpoint consumerEndpoint; - private final Endpoint childEndpoint; - - @UriPath(name = "name", description = "The name of the cluster group to use") + @UriPath(description = "The name of the cluster group to use") @Metadata(required = "true") - private final String singletonId; + private final String groupName; - @UriPath(name = "endpoint", description = "The Camel endpoint to use in master/slave mode") + @UriPath(description = "The consumer endpoint to use in master/slave mode") @Metadata(required = "true") - private final String child; + private final String consumerEndpointUri; - public MasterEndpoint(String uri, MasterComponent component, String singletonId, String child) { + public MasterEndpoint(String uri, MasterComponent component, String groupName, String consumerEndpointUri) { super(uri, component); this.component = component; - this.singletonId = singletonId; - this.child = child; - this.childEndpoint = getCamelContext().getEndpoint(child); + this.groupName = groupName; + this.consumerEndpointUri = consumerEndpointUri; + this.consumerEndpoint = getCamelContext().getEndpoint(consumerEndpointUri); + } + + public Endpoint getEndpoint() { + return consumerEndpoint; + } + + public Endpoint getConsumerEndpoint() { + return getEndpoint(); + } + + @ManagedAttribute(description = "The consumer endpoint url to use in master/slave mode", mask = true) + public String getConsumerEndpointUri() { + return consumerEndpointUri; } - public String getSingletonId() { - return singletonId; + @ManagedAttribute(description = "The name of the cluster group to use") + public String getGroupName() { + return groupName; } @Override @@ -76,23 +92,8 @@ public class MasterEndpoint extends DefaultEndpoint implements DelegateEndpoint return true; } - // Properties - //------------------------------------------------------------------------- public MasterComponent getComponent() { return component; } - public String getChild() { - return child; - } - - // Implementation methods - //------------------------------------------------------------------------- - Endpoint getChildEndpoint() { - return childEndpoint; - } - - public Endpoint getEndpoint() { - return getChildEndpoint(); - } } http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZKComponentSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZKComponentSupport.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZKComponentSupport.java deleted file mode 100644 index 30d9e04..0000000 --- a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZKComponentSupport.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.zookeepermaster; - -import java.util.concurrent.Callable; - -import org.apache.camel.component.zookeepermaster.group.Group; -import org.apache.camel.component.zookeepermaster.group.internal.ManagedGroupFactory; -import org.apache.camel.component.zookeepermaster.group.internal.ManagedGroupFactoryBuilder; -import org.apache.camel.impl.DefaultComponent; -import org.apache.camel.spi.Metadata; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.retry.RetryOneTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class ZKComponentSupport extends DefaultComponent implements Callable<CuratorFramework>, ConnectionStateListener { - private static final transient Logger LOG = LoggerFactory.getLogger(ZKComponentSupport.class); - - private static final String ZOOKEEPER_URL = "zookeeper.url"; - private static final String ZOOKEEPER_PASSWORD = "zookeeper.password"; - private static final String ZOOKEEPER_URL_ENV = "ZOOKEEPER_URL"; - private static final String ZOOKEEPER_HOST_ENV = "ZK_CLIENT_SERVICE_HOST"; - private static final String ZOOKEEPER_PORT_ENV = "ZK_CLIENT_SERVICE_PORT"; - - private ManagedGroupFactory managedGroupFactory; - - @Metadata(label = "advanced") - private CuratorFramework curator; - @Metadata(defaultValue = "10000") - private int maximumConnectionTimeout = 10 * 1000; - @Metadata(defaultValue = "localhost:2181") - private String zooKeeperUrl; - @Metadata(label = "security", secret = true) - private String zooKeeperPassword; - - public CuratorFramework getCurator() { - if (managedGroupFactory == null) { - throw new IllegalStateException("Component is not started"); - } - return managedGroupFactory.getCurator(); - } - - public Group<CamelNodeState> createGroup(String path) { - if (managedGroupFactory == null) { - throw new IllegalStateException("Component is not started"); - } - return managedGroupFactory.createGroup(path, CamelNodeState.class); - } - - /** - * To use a custom configured CuratorFramework as connection to zookeeper ensemble. - */ - public void setCurator(CuratorFramework curator) { - this.curator = curator; - registerAsListener(); - } - - public int getMaximumConnectionTimeout() { - return maximumConnectionTimeout; - } - - /** - * Timeout in millis to use when connecting to the zookeeper ensemble - */ - public void setMaximumConnectionTimeout(int maximumConnectionTimeout) { - this.maximumConnectionTimeout = maximumConnectionTimeout; - } - - public String getZooKeeperUrl() { - return zooKeeperUrl; - } - - /** - * The url for the zookeeper ensemble - */ - public void setZooKeeperUrl(String zooKeeperUrl) { - this.zooKeeperUrl = zooKeeperUrl; - } - - public String getZooKeeperPassword() { - return zooKeeperPassword; - } - - /** - * The password to use when connecting to the zookeeper ensemble - */ - public void setZooKeeperPassword(String zooKeeperPassword) { - this.zooKeeperPassword = zooKeeperPassword; - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - if (curator == null) { - try { - CuratorFramework aCurator = (CuratorFramework) getCamelContext().getRegistry().lookupByName("curator"); - if (aCurator != null) { - setCurator(aCurator); - } - if (curator != null) { - LOG.debug("Zookeeper client found in camel registry. " + curator); - } - } catch (Exception exception) { - // ignore - } - } - - // will auto create curator if needed - managedGroupFactory = ManagedGroupFactoryBuilder.create(curator, getClass().getClassLoader(), getCamelContext().getClassResolver(), this); - } - - public CuratorFramework call() throws Exception { - String connectString = getZooKeeperUrl(); - if (connectString == null) { - connectString = System.getenv(ZOOKEEPER_URL_ENV); - } - if (connectString == null) { - String zkHost = System.getenv(ZOOKEEPER_HOST_ENV); - if (zkHost != null) { - String zkPort = System.getenv(ZOOKEEPER_PORT_ENV); - connectString = zkHost + ":" + (zkPort == null ? "2181" : zkPort); - } - } - if (connectString == null) { - connectString = System.getProperty(ZOOKEEPER_URL, "localhost:2181"); - } - String password = getZooKeeperPassword(); - if (password == null) { - System.getProperty(ZOOKEEPER_PASSWORD); - } - LOG.debug("CuratorFramework not found in Camel registry, creating new with connection " + connectString); - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() - .connectString(connectString) - .retryPolicy(new RetryOneTime(1000)) - .connectionTimeoutMs(getMaximumConnectionTimeout()); - - if (password != null && !password.isEmpty()) { - builder.authorization("digest", ("fabric:" + password).getBytes()); - } - - curator = builder.build(); - LOG.debug("Starting curator " + curator); - curator.start(); - return curator; - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - if (managedGroupFactory != null) { - managedGroupFactory.close(); - managedGroupFactory = null; - } - } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - LOG.info("Curator Connection new state: " + newState); - } - - protected void registerAsListener() { - if (curator != null) { - curator.getConnectionStateListenable().addListener(this); - } - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZookeeperComponentSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZookeeperComponentSupport.java b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZookeeperComponentSupport.java new file mode 100644 index 0000000..0877243 --- /dev/null +++ b/components/camel-zookeeper-master/src/main/java/org/apache/camel/component/zookeepermaster/ZookeeperComponentSupport.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeepermaster; + +import java.util.concurrent.Callable; + +import org.apache.camel.component.zookeepermaster.group.Group; +import org.apache.camel.component.zookeepermaster.group.internal.ManagedGroupFactory; +import org.apache.camel.component.zookeepermaster.group.internal.ManagedGroupFactoryBuilder; +import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.spi.Metadata; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryOneTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ZookeeperComponentSupport extends DefaultComponent implements Callable<CuratorFramework>, ConnectionStateListener { + private static final transient Logger LOG = LoggerFactory.getLogger(ZookeeperComponentSupport.class); + + private static final String ZOOKEEPER_URL = "zookeeper.url"; + private static final String ZOOKEEPER_PASSWORD = "zookeeper.password"; + private static final String ZOOKEEPER_URL_ENV = "ZOOKEEPER_URL"; + private static final String ZOOKEEPER_HOST_ENV = "ZK_CLIENT_SERVICE_HOST"; + private static final String ZOOKEEPER_PORT_ENV = "ZK_CLIENT_SERVICE_PORT"; + + private ManagedGroupFactory managedGroupFactory; + + @Metadata(label = "advanced") + private CuratorFramework curator; + @Metadata(defaultValue = "10000") + private int maximumConnectionTimeout = 10 * 1000; + @Metadata(defaultValue = "localhost:2181") + private String zooKeeperUrl; + @Metadata(label = "security", secret = true) + private String zooKeeperPassword; + + public CuratorFramework getCurator() { + if (managedGroupFactory == null) { + throw new IllegalStateException("Component is not started"); + } + return managedGroupFactory.getCurator(); + } + + public Group<CamelNodeState> createGroup(String path) { + if (managedGroupFactory == null) { + throw new IllegalStateException("Component is not started"); + } + return managedGroupFactory.createGroup(path, CamelNodeState.class); + } + + /** + * To use a custom configured CuratorFramework as connection to zookeeper ensemble. + */ + public void setCurator(CuratorFramework curator) { + this.curator = curator; + registerAsListener(); + } + + public int getMaximumConnectionTimeout() { + return maximumConnectionTimeout; + } + + /** + * Timeout in millis to use when connecting to the zookeeper ensemble + */ + public void setMaximumConnectionTimeout(int maximumConnectionTimeout) { + this.maximumConnectionTimeout = maximumConnectionTimeout; + } + + public String getZooKeeperUrl() { + return zooKeeperUrl; + } + + /** + * The url for the zookeeper ensemble + */ + public void setZooKeeperUrl(String zooKeeperUrl) { + this.zooKeeperUrl = zooKeeperUrl; + } + + public String getZooKeeperPassword() { + return zooKeeperPassword; + } + + /** + * The password to use when connecting to the zookeeper ensemble + */ + public void setZooKeeperPassword(String zooKeeperPassword) { + this.zooKeeperPassword = zooKeeperPassword; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + // attempt to lookup curator framework from registry using the name curator + if (curator == null) { + try { + CuratorFramework aCurator = getCamelContext().getRegistry().lookupByNameAndType("curator", CuratorFramework.class); + if (aCurator != null) { + LOG.debug("CuratorFramework found in CamelRegistry: {}", aCurator); + setCurator(aCurator); + } + } catch (Exception exception) { + // ignore + } + } + + // will auto create curator if needed + managedGroupFactory = ManagedGroupFactoryBuilder.create(curator, getClass().getClassLoader(), getCamelContext().getClassResolver(), this); + } + + public CuratorFramework call() throws Exception { + String connectString = getZooKeeperUrl(); + if (connectString == null) { + connectString = System.getenv(ZOOKEEPER_URL_ENV); + } + if (connectString == null) { + String zkHost = System.getenv(ZOOKEEPER_HOST_ENV); + if (zkHost != null) { + String zkPort = System.getenv(ZOOKEEPER_PORT_ENV); + connectString = zkHost + ":" + (zkPort == null ? "2181" : zkPort); + } + } + if (connectString == null) { + connectString = System.getProperty(ZOOKEEPER_URL, "localhost:2181"); + } + String password = getZooKeeperPassword(); + if (password == null) { + System.getProperty(ZOOKEEPER_PASSWORD); + } + LOG.info("Creating new CuratorFramework with connection: {}", connectString); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(new RetryOneTime(1000)) + .connectionTimeoutMs(getMaximumConnectionTimeout()); + + if (password != null && !password.isEmpty()) { + builder.authorization("digest", ("fabric:" + password).getBytes()); + } + + curator = builder.build(); + LOG.debug("Starting CuratorFramework {}", curator); + curator.start(); + return curator; + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + if (managedGroupFactory != null) { + managedGroupFactory.close(); + managedGroupFactory = null; + } + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + LOG.debug("CuratorFramework state changed: {}", newState); + } + + protected void registerAsListener() { + if (curator != null) { + curator.getConnectionStateListenable().addListener(this); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointTest.java b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointTest.java index b287428..03051b7 100644 --- a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointTest.java +++ b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointTest.java @@ -84,7 +84,7 @@ public class MasterEndpointTest extends AbstractJUnit4SpringContextTests { List<Route> registeredRoutes = camelContext.getRoutes(); assertEquals("number of routes", 1, registeredRoutes.size()); MasterEndpoint endpoint = (MasterEndpoint) registeredRoutes.get(0).getEndpoint(); - assertEquals("wrong endpoint uri", "seda:bar", endpoint.getChild()); + assertEquals("wrong endpoint uri", "seda:bar", endpoint.getConsumerEndpointUri()); String expectedBody = "<matched/>"; http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/components/readme.adoc ---------------------------------------------------------------------- diff --git a/components/readme.adoc b/components/readme.adoc index 35e9644..cc108d7 100644 --- a/components/readme.adoc +++ b/components/readme.adoc @@ -632,7 +632,7 @@ Number of Components: 222 in 176 JAR artifacts (13 deprecated) | link:camel-twitter/src/main/docs/twitter-component.adoc[Twitter] (camel-twitter) + `twitter:kind` | 2.10 | This component integrates with Twitter to send tweets or search for tweets and more. -| link:null/src/main/docs/undertow-component.adoc[Undertow] (null) + +| link:camel-undertow/src/main/docs/undertow-component.adoc[Undertow] (camel-undertow) + `undertow:httpURI` | 2.16 | The undertow component provides HTTP-based endpoints for consuming and producing HTTP requests. | link:../camel-core/src/main/docs/validator-component.adoc[Validator] (camel-core) + http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/platforms/spring-boot/components-starter/camel-zookeeper-master-starter/src/main/java/org/apache/camel/component/zookeepermaster/springboot/MasterComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-zookeeper-master-starter/src/main/java/org/apache/camel/component/zookeepermaster/springboot/MasterComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-zookeeper-master-starter/src/main/java/org/apache/camel/component/zookeepermaster/springboot/MasterComponentConfiguration.java index 8408447..8529a4d 100644 --- a/platforms/spring-boot/components-starter/camel-zookeeper-master-starter/src/main/java/org/apache/camel/component/zookeepermaster/springboot/MasterComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-zookeeper-master-starter/src/main/java/org/apache/camel/component/zookeepermaster/springboot/MasterComponentConfiguration.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.zookeepermaster.springboot; +import org.apache.camel.component.zookeepermaster.ContainerIdFactory; import org.apache.curator.framework.CuratorFramework; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.NestedConfigurationProperty; @@ -30,6 +31,11 @@ import org.springframework.boot.context.properties.NestedConfigurationProperty; public class MasterComponentConfiguration { /** + * To use a custom ContainerIdFactory for creating container ids. + */ + @NestedConfigurationProperty + private ContainerIdFactory containerIdFactory; + /** * The root path to use in zookeeper where information is stored which nodes * are master/slave etc. Will by default use: * /camel/zookeepermaster/clusters/master @@ -60,6 +66,14 @@ public class MasterComponentConfiguration { */ private Boolean resolvePropertyPlaceholders = true; + public ContainerIdFactory getContainerIdFactory() { + return containerIdFactory; + } + + public void setContainerIdFactory(ContainerIdFactory containerIdFactory) { + this.containerIdFactory = containerIdFactory; + } + public String getZkRoot() { return zkRoot; } http://git-wip-us.apache.org/repos/asf/camel/blob/09fb6426/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml index cb9e151..3035ac7 100644 --- a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml +++ b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml @@ -2704,6 +2704,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-zookeeper-master</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-zookeeper-starter</artifactId> <version>${project.version}</version> </dependency>