CAMEL-9496: camel-hbase - The mapping options in the uri should use prefix
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e7118d8a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e7118d8a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e7118d8a Branch: refs/heads/master Commit: e7118d8ad6bb5a2003eae9f8bcfa9bfed64e3dfb Parents: a8e974f Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Jan 10 13:53:15 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jan 10 13:53:15 2016 +0100 ---------------------------------------------------------------------- .../camel/component/hbase/HBaseComponent.java | 41 ++----------- .../camel/component/hbase/HBaseEndpoint.java | 63 ++++++++++++++++++++ .../component/hbase/HBaseConvertionsTest.java | 2 +- .../component/hbase/HBaseProducerTest.java | 2 +- 4 files changed, 69 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e7118d8a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java index 339ff72..dcb8ac6 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java @@ -19,9 +19,8 @@ package org.apache.camel.component.hbase; import java.util.Map; import org.apache.camel.Endpoint; -import org.apache.camel.component.hbase.model.HBaseCell; -import org.apache.camel.component.hbase.model.HBaseRow; import org.apache.camel.impl.UriEndpointComponent; +import org.apache.camel.util.IntrospectionSupport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTablePool; @@ -55,45 +54,13 @@ public class HBaseComponent extends UriEndpointComponent { } protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - String tableName = remaining; - - HBaseEndpoint endpoint = new HBaseEndpoint(uri, this, tablePool, tableName); - HBaseRow parameterRowModel = createRowModel(parameters); + HBaseEndpoint endpoint = new HBaseEndpoint(uri, this, tablePool, remaining); + Map<String, Object> mapping = IntrospectionSupport.extractProperties(parameters, "row."); + endpoint.setRowMapping(mapping); setProperties(endpoint, parameters); - if (endpoint.getRowModel() == null) { - endpoint.setRowModel(parameterRowModel); - } return endpoint; } - /** - * Creates an {@link HBaseRow} model from the specified endpoint parameters. - */ - public HBaseRow createRowModel(Map<String, Object> parameters) { - HBaseRow rowModel = new HBaseRow(); - if (parameters.containsKey(HbaseAttribute.HBASE_ROW_TYPE.asOption())) { - String rowType = String.valueOf(parameters.remove(HbaseAttribute.HBASE_ROW_TYPE.asOption())); - if (rowType != null && !rowType.isEmpty()) { - rowModel.setRowType(getCamelContext().getClassResolver().resolveClass(rowType)); - } - } - for (int i = 1; parameters.get(HbaseAttribute.HBASE_FAMILY.asOption(i)) != null - && parameters.get(HbaseAttribute.HBASE_QUALIFIER.asOption(i)) != null; i++) { - HBaseCell cellModel = new HBaseCell(); - cellModel.setFamily(String.valueOf(parameters.remove(HbaseAttribute.HBASE_FAMILY.asOption(i)))); - cellModel.setQualifier(String.valueOf(parameters.remove(HbaseAttribute.HBASE_QUALIFIER.asOption(i)))); - cellModel.setValue(String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE.asOption(i)))); - if (parameters.containsKey(HbaseAttribute.HBASE_VALUE_TYPE.asOption(i))) { - String valueType = String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE_TYPE.asOption(i))); - if (valueType != null && !valueType.isEmpty()) { - rowModel.setRowType(getCamelContext().getClassResolver().resolveClass(valueType)); - } - } - rowModel.getCells().add(cellModel); - } - return rowModel; - } - public Configuration getConfiguration() { return configuration; } http://git-wip-us.apache.org/repos/asf/camel/blob/e7118d8a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java index f08ee93..27c5773 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java @@ -18,11 +18,13 @@ package org.apache.camel.component.hbase; import java.security.PrivilegedAction; import java.util.List; +import java.util.Map; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.hbase.mapping.CellMappingStrategyFactory; +import org.apache.camel.component.hbase.model.HBaseCell; import org.apache.camel.component.hbase.model.HBaseRow; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.Metadata; @@ -70,6 +72,8 @@ public class HBaseEndpoint extends DefaultEndpoint { private int maxMessagesPerPoll; @UriParam private UserGroupInformation userGroupInformation; + @UriParam(prefix = "row.", multiValue = true) + private Map<String, Object> rowMapping; /** * in the purpose of performance optimization @@ -241,6 +245,36 @@ public class HBaseEndpoint extends DefaultEndpoint { this.userGroupInformation = userGroupInformation; } + public Map<String, Object> getRowMapping() { + return rowMapping; + } + + /** + * To map the key/values from the Map to a {@link HBaseRow}. + * <p/> + * The following keys is supported: + * <ul> + * <li>rowId - The id of the row. This has limited use as the row usually changes per Exchange.</li> + * <li>rowType - The type to covert row id to. Supported operations: CamelHBaseScan.</li> + * <li>family - The column family. Supports a number suffix for referring to more than one columns.</li> + * <li>qualifier - The column qualifier. Supports a number suffix for referring to more than one columns.</li> + * <li>value - The value. Supports a number suffix for referring to more than one columns</li> + * <li>valueType - The value type. Supports a number suffix for referring to more than one columns. Supported operations: CamelHBaseGet, and CamelHBaseScan.</li> + * </ul> + */ + public void setRowMapping(Map<String, Object> rowMapping) { + this.rowMapping = rowMapping; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + if (rowModel == null && rowMapping != null) { + rowModel = createRowModel(rowMapping); + } + } + /** * Gets connection to the table (secured or not, depends on the object initialization) * please remember to close the table after use @@ -258,4 +292,33 @@ public class HBaseEndpoint extends DefaultEndpoint { return tablePool.getTable(tableNameBytes); } } + + /** + * Creates an {@link HBaseRow} model from the specified endpoint parameters. + */ + private HBaseRow createRowModel(Map<String, Object> parameters) { + HBaseRow rowModel = new HBaseRow(); + if (parameters.containsKey(HbaseAttribute.HBASE_ROW_TYPE.asOption())) { + String rowType = String.valueOf(parameters.remove(HbaseAttribute.HBASE_ROW_TYPE.asOption())); + if (rowType != null && !rowType.isEmpty()) { + rowModel.setRowType(getCamelContext().getClassResolver().resolveClass(rowType)); + } + } + for (int i = 1; parameters.get(HbaseAttribute.HBASE_FAMILY.asOption(i)) != null + && parameters.get(HbaseAttribute.HBASE_QUALIFIER.asOption(i)) != null; i++) { + HBaseCell cellModel = new HBaseCell(); + cellModel.setFamily(String.valueOf(parameters.remove(HbaseAttribute.HBASE_FAMILY.asOption(i)))); + cellModel.setQualifier(String.valueOf(parameters.remove(HbaseAttribute.HBASE_QUALIFIER.asOption(i)))); + cellModel.setValue(String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE.asOption(i)))); + if (parameters.containsKey(HbaseAttribute.HBASE_VALUE_TYPE.asOption(i))) { + String valueType = String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE_TYPE.asOption(i))); + if (valueType != null && !valueType.isEmpty()) { + rowModel.setRowType(getCamelContext().getClassResolver().resolveClass(valueType)); + } + } + rowModel.getCells().add(cellModel); + } + return rowModel; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/e7118d8a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java index 1313936..9813744 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java @@ -101,7 +101,7 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport { .to("hbase://" + PERSON_TABLE); from("direct:scan") - .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&family=family1&qualifier=column1"); + .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&row.family=family1&row.qualifier=column1"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/e7118d8a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java index 9e71c65..fa3f229 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java @@ -323,7 +323,7 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { .to("hbase://" + PERSON_TABLE); from("direct:start-with-model") - .to("hbase://" + PERSON_TABLE + "?family=info&qualifier=firstName&family2=birthdate&qualifier2=year"); + .to("hbase://" + PERSON_TABLE + "?row.family=info&row.qualifier=firstName&row.family2=birthdate&row.qualifier2=year"); from("direct:scan") .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN);