Repository: camel Updated Branches: refs/heads/master f5832c648 -> dd93a625e
CAMEL-7999: More components include documentation Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dd93a625 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dd93a625 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dd93a625 Branch: refs/heads/master Commit: dd93a625e3345c7edcd13d04d0e050f87bd3f384 Parents: f5832c6 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jan 6 13:29:56 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Jan 6 13:29:56 2015 +0100 ---------------------------------------------------------------------- .../component/cassandra/CassandraComponent.java | 144 ++++------- .../component/cassandra/CassandraConsumer.java | 14 +- .../component/cassandra/CassandraEndpoint.java | 242 ++++++++++++++----- .../component/cassandra/CassandraProducer.java | 2 +- .../CassandraComponentClusterBuilderTest.java | 81 ------- 5 files changed, 240 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/dd93a625/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraComponent.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraComponent.java index 9a1dac6..a6e9e33 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraComponent.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraComponent.java @@ -17,14 +17,11 @@ package org.apache.camel.component.cassandra; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; import org.apache.camel.Endpoint; -import org.apache.camel.impl.DefaultComponent; -import org.apache.camel.util.EndpointHelper; +import org.apache.camel.impl.UriEndpointComponent; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.ObjectHelper; /** * Represents the component that manages {@link CassandraEndpoint}. This @@ -40,109 +37,56 @@ import org.apache.camel.util.EndpointHelper; * <li>cql:bean:clusterRef/keyspace</li> * </ul> */ -public class CassandraComponent extends DefaultComponent { - /** - * Regular expression for parsing host name - */ - private static final String HOST_PATTERN = "[\\w.\\-]+"; - /** - * Regular expression for parsing several hosts name - */ - private static final String HOSTS_PATTERN = HOST_PATTERN + "(?:," + HOST_PATTERN + ")*"; - /** - * Regular expression for parsing port - */ - private static final String PORT_PATTERN = "\\d+"; - /** - * Regular expression for parsing keyspace - */ - private static final String KEYSPACE_PATTERN = "\\w+"; - /** - * Regular expression for parsing URI host1,host2:9042/keyspace - */ - private static final Pattern HOSTS_PORT_KEYSPACE_PATTERN = Pattern.compile( - "^(" + HOSTS_PATTERN + ")?" // Hosts - + "(?::(" + PORT_PATTERN + "))?" // Port - + "(?:/(" + KEYSPACE_PATTERN + "))?$"); // Keyspace - /** - * Regular expression for parsing URI bean:sessionRef - */ - private static final Pattern BEAN_REF_PATTERN = Pattern.compile( - "^bean:([\\w.\\-]+)(?:/(" + KEYSPACE_PATTERN + "))?$"); // Keyspace +public class CassandraComponent extends UriEndpointComponent { + + public CassandraComponent() { + super(CassandraEndpoint.class); + } @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - Cluster cluster; - Session session; - String keyspace; - // Try URI of type cql:bean:session or - Matcher beanRefMatcher = BEAN_REF_PATTERN.matcher(remaining); - if (beanRefMatcher.matches()) { - String beanRefName = beanRefMatcher.group(1); - keyspace = beanRefMatcher.group(2); - Object bean = EndpointHelper.resolveParameter(getCamelContext(), "#" + beanRefName, Object.class); - if (bean instanceof Session) { - session = (Session) bean; - cluster = session.getCluster(); - keyspace = session.getLoggedKeyspace(); - } else if (bean instanceof Cluster) { - cluster = (Cluster) bean; - session = null; + String beanRef = null; + String hosts = null; + String port = null; + String keyspace = null; + + int pos = remaining.lastIndexOf("/"); + if (pos > 0) { + keyspace = remaining.substring(pos + 1); + remaining = remaining.substring(0, pos); + } + + // if its a bean reference to either a cluster/session then lookup + if (remaining.startsWith("bean:")) { + beanRef = remaining.substring(5); + } else { + // hosts and port (port is optional) + if (remaining.contains(":")) { + port = ObjectHelper.after(remaining, ":"); + hosts = ObjectHelper.before(remaining, ":"); } else { - throw new IllegalArgumentException("CQL Bean type should be of type Session or Cluster but was " + bean); + hosts = remaining; } - } else { - // Try URI of type cql:host1,host2:9042/keyspace - cluster = clusterBuilder(remaining, parameters).build(); - session = null; - keyspace = getAndRemoveParameter(parameters, "keyspace", String.class); } - Endpoint endpoint = new CassandraEndpoint(uri, this, cluster, session, keyspace); - setProperties(endpoint, parameters); - return endpoint; - } - - /** - * Parse URI of the form cql://host1,host2:9042/keyspace and create a - * {@link Cluster.Builder} - */ - protected Cluster.Builder clusterBuilder(String remaining, Map<String, Object> parameters) throws NumberFormatException { - Cluster.Builder clusterBuilder = Cluster.builder(); - Matcher matcher = HOSTS_PORT_KEYSPACE_PATTERN.matcher(remaining); - if (matcher.matches()) { - // Parse hosts - String hostsGroup = matcher.group(1); - if (hostsGroup != null && !hostsGroup.isEmpty()) { - String[] hosts = hostsGroup.split(","); - clusterBuilder = clusterBuilder.addContactPoints(hosts); - } - // Parse port - String portGroup = matcher.group(2); - if (portGroup != null) { - Integer port = Integer.valueOf(portGroup); - clusterBuilder = clusterBuilder.withPort(port); - } - // Parse keyspace - String keyspaceGroup = matcher.group(3); - if (keyspaceGroup != null && !keyspaceGroup.isEmpty()) { - String keyspace = keyspaceGroup; - parameters.put("keyspace", keyspace); - } - } else { - throw new IllegalArgumentException("Invalid CQL URI"); + ResultSetConversionStrategy rs = null; + String strategy = getAndRemoveParameter(parameters, "resultSetConversionStrategy", String.class); + if (strategy != null) { + rs = ResultSetConversionStrategies.fromName(strategy); } - // Cluster name parameter - String clusterName = getAndRemoveParameter(parameters, "clusterName", String.class); - if (clusterName != null) { - clusterBuilder = clusterBuilder.withClusterName(clusterName); + CassandraEndpoint endpoint = new CassandraEndpoint(uri, this); + endpoint.setBeanRef(beanRef); + endpoint.setHosts(hosts); + if (port != null) { + int num = CamelContextHelper.parseInteger(getCamelContext(), port); + endpoint.setPort(num); } - // Username and password - String username = getAndRemoveOrResolveReferenceParameter(parameters, "username", String.class); - String password = getAndRemoveOrResolveReferenceParameter(parameters, "password", String.class); - if (username != null && !username.isEmpty() && password != null) { - clusterBuilder.withCredentials(username, password); + endpoint.setKeyspace(keyspace); + if (rs != null) { + endpoint.setResultSetConversionStrategy(rs); } - return clusterBuilder; + setProperties(endpoint, parameters); + return endpoint; } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/dd93a625/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java index 952eae3..168a7c2 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java @@ -46,10 +46,7 @@ public class CassandraConsumer extends ScheduledPollConsumer { @Override protected int poll() throws Exception { // Execute CQL Query - Session session = getEndpoint().getSession(); - if (preparedStatement == null) { - preparedStatement = getEndpoint().prepareStatement(); - } + Session session = getEndpoint().getSessionHolder().getSession(); ResultSet resultSet = session.execute(preparedStatement.bind()); // Create message from ResultSet @@ -68,4 +65,13 @@ public class CassandraConsumer extends ScheduledPollConsumer { } } } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + if (preparedStatement == null) { + preparedStatement = getEndpoint().prepareStatement(); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/dd93a625/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java index 49d3a71..5d6ad1f 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java @@ -21,64 +21,68 @@ import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; +import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.util.CamelContextHelper; import org.apache.camel.utils.cassandra.CassandraSessionHolder; /** * Cassandra 2 CQL3 endpoint */ +@UriEndpoint(scheme = "cql", consumerClass = CassandraConsumer.class, label = "database,nosql") public class CassandraEndpoint extends DefaultEndpoint { - /** - * Session holder - */ - private CassandraSessionHolder sessionHolder; - /** - * CQL query - */ + + private volatile CassandraSessionHolder sessionHolder; + + @UriPath + private String beanRef; + @UriPath + private String hosts; + @UriPath + private Integer port; + @UriPath + private String keyspace; + @UriParam private String cql; + @UriParam + private String clusterName; + @UriParam + private String username; + @UriParam + private String password; + @UriParam + private Cluster cluster; + @UriParam + private Session session; + /** * Consistency level: ONE, TWO, QUORUM, LOCAL_QUORUM, ALL... */ @UriParam private ConsistencyLevel consistencyLevel; + /** * How many rows should be retrieved in message body */ + @UriParam private ResultSetConversionStrategy resultSetConversionStrategy = ResultSetConversionStrategies.all(); - /** - * Cassandra URI - * - * @param uri - * @param component Parent component - * @param cluster Cluster (required) - * @param session Session (optional) - * @param keyspace Keyspace (optional) - */ - public CassandraEndpoint(String uri, CassandraComponent component, Cluster cluster, Session session, String keyspace) { - super(uri, component); - if (session == null) { - sessionHolder = new CassandraSessionHolder(cluster, keyspace); - } else { - sessionHolder = new CassandraSessionHolder(session); - } + public CassandraEndpoint(String endpointUri, Component component) { + super(endpointUri, component); } - @Override - protected void doStart() throws Exception { - super.doStart(); - sessionHolder.start(); - } - - @Override - protected void doStop() throws Exception { - sessionHolder.stop(); - super.doStop(); + public CassandraEndpoint(String uri, CassandraComponent component, Cluster cluster, Session session, String keyspace) { + super(uri, component); + this.cluster = cluster; + this.session = session; + this.keyspace = keyspace; } public Producer createProducer() throws Exception { @@ -93,47 +97,72 @@ public class CassandraEndpoint extends DefaultEndpoint { return true; } - public Session getSession() { - return sessionHolder.getSession(); - } + @Override + protected void doStart() throws Exception { + super.doStart(); - public String getCql() { - return cql; - } + // we can get the cluster using various ways - public void setCql(String cql) { - this.cql = cql; - } + if (cluster == null && beanRef != null) { + Object bean = CamelContextHelper.mandatoryLookup(getCamelContext(), beanRef); + if (bean instanceof Session) { + session = (Session) bean; + cluster = session.getCluster(); + keyspace = session.getLoggedKeyspace(); + } else if (bean instanceof Cluster) { + cluster = (Cluster) bean; + session = null; + } else { + throw new IllegalArgumentException("CQL Bean type should be of type Session or Cluster but was " + bean); + } + } - public String getKeyspace() { - return sessionHolder.getKeyspace(); - } + if (cluster == null && hosts != null) { + // use the cluster builder to create the cluster + cluster = createClusterBuilder().build(); + } - public ConsistencyLevel getConsistencyLevel() { - return consistencyLevel; - } + if (cluster != null) { + sessionHolder = new CassandraSessionHolder(cluster, keyspace); + } else { + sessionHolder = new CassandraSessionHolder(session); + } - public void setConsistencyLevel(ConsistencyLevel consistencyLevel) { - this.consistencyLevel = consistencyLevel; + sessionHolder.start(); } - public ResultSetConversionStrategy getResultSetConversionStrategy() { - return resultSetConversionStrategy; + @Override + protected void doStop() throws Exception { + super.doStop(); + sessionHolder.stop(); } - public void setResultSetConversionStrategy(ResultSetConversionStrategy resultSetConversionStrategy) { - this.resultSetConversionStrategy = resultSetConversionStrategy; + protected CassandraSessionHolder getSessionHolder() { + return sessionHolder; } - public void setResultSetConversionStrategy(String converter) { - this.resultSetConversionStrategy = ResultSetConversionStrategies.fromName(converter); + protected Cluster.Builder createClusterBuilder() throws Exception { + Cluster.Builder clusterBuilder = Cluster.builder(); + for (String host : hosts.split(",")) { + clusterBuilder = clusterBuilder.addContactPoint(host); + } + if (port != null) { + clusterBuilder = clusterBuilder.withPort(port); + } + if (clusterName != null) { + clusterBuilder = clusterBuilder.withClusterName(clusterName); + } + if (username != null && !username.isEmpty() && password != null) { + clusterBuilder.withCredentials(username, password); + } + return clusterBuilder; } /** * Create and configure a Prepared CQL statement */ protected PreparedStatement prepareStatement(String cql) { - PreparedStatement preparedStatement = getSession().prepare(cql); + PreparedStatement preparedStatement = getSessionHolder().getSession().prepare(cql); if (consistencyLevel != null) { preparedStatement.setConsistencyLevel(consistencyLevel); } @@ -154,4 +183,103 @@ public class CassandraEndpoint extends DefaultEndpoint { message.setBody(resultSetConversionStrategy.getBody(resultSet)); } + public String getBeanRef() { + return beanRef; + } + + public void setBeanRef(String beanRef) { + this.beanRef = beanRef; + } + + public String getHosts() { + return hosts; + } + + public void setHosts(String hosts) { + this.hosts = hosts; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getKeyspace() { + return keyspace; + } + + public void setKeyspace(String keyspace) { + this.keyspace = keyspace; + } + + public String getCql() { + return cql; + } + + public void setCql(String cql) { + this.cql = cql; + } + + public Cluster getCluster() { + return cluster; + } + + public void setCluster(Cluster cluster) { + this.cluster = cluster; + } + + public Session getSession() { + if (session == null) { + return sessionHolder.getSession(); + } else { + return session; + } + } + + public void setSession(Session session) { + this.session = session; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + public void setConsistencyLevel(ConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + } + + public ResultSetConversionStrategy getResultSetConversionStrategy() { + return resultSetConversionStrategy; + } + + public void setResultSetConversionStrategy(ResultSetConversionStrategy resultSetConversionStrategy) { + this.resultSetConversionStrategy = resultSetConversionStrategy; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/dd93a625/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java index 9921191..cc53775 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java @@ -84,7 +84,7 @@ public class CassandraProducer extends DefaultProducer { // Message CQL lPreparedStatement = getEndpoint().prepareStatement(messageCql); } - Session session = getEndpoint().getSession(); + Session session = getEndpoint().getSessionHolder().getSession(); if (cqlParams == null) { resultSet = session.execute(lPreparedStatement.bind()); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/dd93a625/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentClusterBuilderTest.java ---------------------------------------------------------------------- diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentClusterBuilderTest.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentClusterBuilderTest.java deleted file mode 100644 index 826cd17..0000000 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/CassandraComponentClusterBuilderTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.cassandra; - -import java.util.HashMap; -import java.util.Map; - -import com.datastax.driver.core.Cluster; -import org.apache.camel.impl.DefaultCamelContext; -import org.junit.Before; -import org.junit.Test; - -import static org.hamcrest.Matchers.isOneOf; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; - -/** - * Unit test for {@link CassandraComponent} - */ -public class CassandraComponentClusterBuilderTest { - private final CassandraComponent component = new CassandraComponent(); - - @Before - public void setUp() { - component.setCamelContext(new DefaultCamelContext()); - } - - @Test - public void testClusterBuilderBasic() { - Map<String, Object> params = new HashMap<String, Object>(); - params.put("clusterName", "cluster"); - Cluster.Builder clusterBuilder = component.clusterBuilder("127.0.0.1,127.0.0.2/keyspace", params); - - assertEquals(2, clusterBuilder.getContactPoints().size()); - assertThat(clusterBuilder.getContactPoints().get(0).getHostName(), isOneOf("127.0.0.1", "localhost")); - assertThat(clusterBuilder.getContactPoints().get(1).getHostName(), isOneOf("127.0.0.2", "localhost")); - assertEquals("cluster", clusterBuilder.getClusterName()); - assertEquals("keyspace", params.get("keyspace")); - } - - @Test - public void testClusterBuilderPort() { - Map<String, Object> params = new HashMap<String, Object>(); - params.put("clusterName", "cluster"); - Cluster.Builder clusterBuilder = component.clusterBuilder("127.0.0.1,127.0.0.2:1234/keyspace", params); - - assertEquals(2, clusterBuilder.getContactPoints().size()); - assertThat(clusterBuilder.getContactPoints().get(0).getHostName(), isOneOf("127.0.0.1", "localhost")); - assertEquals(1234, clusterBuilder.getContactPoints().get(0).getPort()); - assertThat(clusterBuilder.getContactPoints().get(1).getHostName(), isOneOf("127.0.0.2", "localhost")); - assertEquals(1234, clusterBuilder.getConfiguration().getProtocolOptions().getPort()); - assertEquals("cluster", clusterBuilder.getClusterName()); - assertEquals("keyspace", params.get("keyspace")); - } - - @Test - public void testClusterBuilderSimplest() { - Map<String, Object> params = new HashMap<String, Object>(); - Cluster.Builder clusterBuilder = component.clusterBuilder("127.0.0.1", params); - - assertEquals(1, clusterBuilder.getContactPoints().size()); - assertThat(clusterBuilder.getContactPoints().get(0).getHostName(), isOneOf("127.0.0.1", "localhost")); - assertNull(params.get("keyspace")); - } - -}