Repository: camel Updated Branches: refs/heads/master 800d74f70 -> bb3a75efc
CAMEL-9644 - camel-hbase : remove deprecated code Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bb3a75ef Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bb3a75ef Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bb3a75ef Branch: refs/heads/master Commit: bb3a75efcec72fa1c9f45069353a091b77697d95 Parents: 800d74f Author: lburgazzoli <lburgazz...@gmail.com> Authored: Thu Feb 25 15:35:46 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Feb 25 17:14:32 2016 +0100 ---------------------------------------------------------------------- .../camel/component/hbase/HBaseAttribute.java | 73 +++++++++ .../camel/component/hbase/HBaseComponent.java | 18 ++- .../camel/component/hbase/HBaseConsumer.java | 26 ++-- .../camel/component/hbase/HBaseContats.java | 34 ----- .../component/hbase/HBaseDeleteHandler.java | 5 +- .../camel/component/hbase/HBaseEndpoint.java | 49 +++--- .../camel/component/hbase/HBaseProducer.java | 59 ++++---- .../component/hbase/HBaseRemoveHandler.java | 4 +- .../camel/component/hbase/HbaseAttribute.java | 65 -------- .../hbase/mapping/CellMappingStrategy.java | 2 +- .../mapping/CellMappingStrategyFactory.java | 10 +- .../hbase/mapping/HeaderMappingStrategy.java | 26 ++-- .../idempotent/HBaseIdempotentRepository.java | 72 +++------ .../component/hbase/CamelHBaseFilterTest.java | 13 +- .../component/hbase/CamelHBaseTestSupport.java | 10 +- .../component/hbase/HBaseConsumerTest.java | 12 +- .../component/hbase/HBaseConvertionsTest.java | 25 ++-- .../component/hbase/HBaseProducerTest.java | 148 ++++++++++--------- .../HBaseIdempotentRepositoryTest.java | 3 - .../src/test/resources/log4j.properties | 1 - parent/pom.xml | 2 +- 21 files changed, 312 insertions(+), 345 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseAttribute.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseAttribute.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseAttribute.java new file mode 100644 index 0000000..56b190e --- /dev/null +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseAttribute.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hbase; + +public enum HBaseAttribute { + + HBASE_ROW_ID("CamelHBaseRowId"), + HBASE_ROW_TYPE("CamelHBaseRowType"), + HBASE_MARKED_ROW_ID("CamelHBaseMarkedRowId"), + HBASE_FAMILY("CamelHBaseFamily"), + HBASE_QUALIFIER("CamelHBaseQualifier"), + HBASE_VALUE("CamelHBaseValue"), + HBASE_VALUE_TYPE("CamelHBaseValueType"); + + private final String value; + private final String option; + + HBaseAttribute(String value) { + this.value = value; + this.option = asOption(value); + } + + public String asHeader(int i) { + if (i > 1) { + return value + i; + } else { + return value; + } + } + + public String asHeader() { + return value; + } + + public String asOption() { + return option; + } + + public String asOption(int i) { + if (i > 1) { + return option + i; + } else { + return option; + } + } + + @Override + public String toString() { + return value; + } + + private static String asOption(String name) { + StringBuilder sb = new StringBuilder(); + sb.append(name, "CamelHBase".length(), name.length()); + sb.setCharAt(0, Character.toLowerCase(sb.charAt(0))); + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java index dcb8ac6..731b288 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java @@ -17,13 +17,15 @@ package org.apache.camel.component.hbase; import java.util.Map; +import java.util.concurrent.Executors; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; import org.apache.camel.util.IntrospectionSupport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; /** * Represents the component that manages {@link HBaseEndpoint}. @@ -31,7 +33,7 @@ import org.apache.hadoop.hbase.client.HTablePool; public class HBaseComponent extends UriEndpointComponent { private Configuration configuration; - private HTablePool tablePool; + private Connection connection; private int poolMaxSize = 10; public HBaseComponent() { @@ -43,18 +45,22 @@ public class HBaseComponent extends UriEndpointComponent { if (configuration == null) { configuration = HBaseConfiguration.create(); } - tablePool = new HTablePool(configuration, poolMaxSize); + + connection = ConnectionFactory.createConnection( + configuration, + Executors.newFixedThreadPool(poolMaxSize) + ); } @Override protected void doStop() throws Exception { - if (tablePool != null) { - tablePool.close(); + if (connection != null) { + connection.close(); } } protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - HBaseEndpoint endpoint = new HBaseEndpoint(uri, this, tablePool, remaining); + HBaseEndpoint endpoint = new HBaseEndpoint(uri, this, connection, remaining); Map<String, Object> mapping = IntrospectionSupport.extractProperties(parameters, "row."); endpoint.setRowMapping(mapping); setProperties(endpoint, parameters); http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java index 59aaa43..b54b284 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java @@ -33,10 +33,10 @@ import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; @@ -61,15 +61,14 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer { @Override protected int poll() throws Exception { - HTableInterface table = endpoint.getTable(); - try { + try (Table table = endpoint.getTable()) { shutdownRunningTask = null; pendingExchanges = 0; - Queue<Exchange> queue = new LinkedList<Exchange>(); + Queue<Exchange> queue = new LinkedList<>(); Scan scan = new Scan(); - List<Filter> filters = new LinkedList<Filter>(); + List<Filter> filters = new LinkedList<>(); if (endpoint.getFilters() != null) { filters.addAll(endpoint.getFilters()); } @@ -111,8 +110,10 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer { HBaseCell resultCell = new HBaseCell(); String family = modelCell.getFamily(); String column = modelCell.getQualifier(); - resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(modelCell.getValueType(), - result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)))); + resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo( + modelCell.getValueType(), + result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column))) + ); resultCell.setFamily(modelCell.getFamily()); resultCell.setQualifier(modelCell.getQualifier()); resultRow.getCells().add(resultCell); @@ -136,15 +137,13 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer { exchange.getIn().setHeader(CellMappingStrategyFactory.STRATEGY, CellMappingStrategyFactory.BODY); mappingStrategy.applyScanResults(exchange.getIn(), data); //Make sure that there is a header containing the marked row ids, so that they can be deleted. - exchange.getIn().setHeader(HbaseAttribute.HBASE_MARKED_ROW_ID.asHeader(), result.getRow()); + exchange.getIn().setHeader(HBaseAttribute.HBASE_MARKED_ROW_ID.asHeader(), result.getRow()); queue.add(exchange); exchangeCount++; } } scanner.close(); return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue)); - } finally { - table.close(); } } @@ -177,7 +176,7 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer { } if (endpoint.isRemove()) { - remove((byte[]) exchange.getIn().getHeader(HbaseAttribute.HBASE_MARKED_ROW_ID.asHeader())); + remove((byte[]) exchange.getIn().getHeader(HBaseAttribute.HBASE_MARKED_ROW_ID.asHeader())); } } @@ -188,11 +187,8 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer { * Delegates to the {@link HBaseRemoveHandler}. */ private void remove(byte[] row) throws IOException { - HTableInterface table = endpoint.getTable(); - try { + try (Table table = endpoint.getTable()) { endpoint.getRemoveHandler().remove(table, row); - } finally { - table.close(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseContats.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseContats.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseContats.java deleted file mode 100644 index 1536f78..0000000 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseContats.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.hbase; - -@Deprecated -public final class HBaseContats { - - public static final String OPERATION = "CamelHBaseOperation"; - - public static final String PUT = "CamelHBasePut"; - public static final String GET = "CamelHBaseGet"; - public static final String SCAN = "CamelHBaseScan"; - public static final String DELETE = "CamelHBaseDelete"; - - public static final String HBASE_MAX_SCAN_RESULTS = "CamelHBaseMaxScanResults"; - - private HBaseContats() { - //Utility Class - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java index d1901a5..c582cc1 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseDeleteHandler.java @@ -17,8 +17,9 @@ package org.apache.camel.component.hbase; import java.io.IOException; + import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +31,7 @@ public class HBaseDeleteHandler implements HBaseRemoveHandler { * Performs a {@link Delete} of the specified row. */ @Override - public void remove(HTableInterface table, byte[] row) { + public void remove(Table table, byte[] row) { Delete delete = new Delete(row); try { table.delete(delete); http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java index 27c5773..81bfe7f 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.hbase; +import java.io.IOException; import java.security.PrivilegedAction; import java.util.List; import java.util.Map; @@ -32,9 +33,10 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.security.UserGroupInformation; @@ -45,7 +47,7 @@ import org.apache.hadoop.security.UserGroupInformation; public class HBaseEndpoint extends DefaultEndpoint { private Configuration configuration; - private final HTablePool tablePool; + private final Connection connection; private HBaseAdmin admin; @UriPath(description = "The name of the table") @Metadata(required = "true") @@ -80,10 +82,10 @@ public class HBaseEndpoint extends DefaultEndpoint { */ private byte[] tableNameBytes; - public HBaseEndpoint(String uri, HBaseComponent component, HTablePool tablePool, String tableName) { + public HBaseEndpoint(String uri, HBaseComponent component, Connection connection, String tableName) { super(uri, component); this.tableName = tableName; - this.tablePool = tablePool; + this.connection = connection; if (this.tableName == null) { throw new IllegalArgumentException("Table name can not be null"); } else { @@ -275,21 +277,30 @@ public class HBaseEndpoint extends DefaultEndpoint { } } + @Override + protected void doStop() throws Exception { + super.doStop(); + } + /** * Gets connection to the table (secured or not, depends on the object initialization) * please remember to close the table after use * @return table, remember to close! */ - public HTableInterface getTable() { + public Table getTable() throws IOException { if (userGroupInformation != null) { - return userGroupInformation.doAs(new PrivilegedAction<HTableInterface>() { + return userGroupInformation.doAs(new PrivilegedAction<Table>() { @Override - public HTableInterface run() { - return tablePool.getTable(tableNameBytes); + public Table run() { + try { + return connection.getTable(TableName.valueOf(tableNameBytes)); + } catch (IOException e) { + throw new RuntimeException(e); + } } }); } else { - return tablePool.getTable(tableNameBytes); + return connection.getTable(TableName.valueOf(tableNameBytes)); } } @@ -298,20 +309,20 @@ public class HBaseEndpoint extends DefaultEndpoint { */ private HBaseRow createRowModel(Map<String, Object> parameters) { HBaseRow rowModel = new HBaseRow(); - if (parameters.containsKey(HbaseAttribute.HBASE_ROW_TYPE.asOption())) { - String rowType = String.valueOf(parameters.remove(HbaseAttribute.HBASE_ROW_TYPE.asOption())); + if (parameters.containsKey(HBaseAttribute.HBASE_ROW_TYPE.asOption())) { + String rowType = String.valueOf(parameters.remove(HBaseAttribute.HBASE_ROW_TYPE.asOption())); if (rowType != null && !rowType.isEmpty()) { rowModel.setRowType(getCamelContext().getClassResolver().resolveClass(rowType)); } } - for (int i = 1; parameters.get(HbaseAttribute.HBASE_FAMILY.asOption(i)) != null - && parameters.get(HbaseAttribute.HBASE_QUALIFIER.asOption(i)) != null; i++) { + for (int i = 1; parameters.get(HBaseAttribute.HBASE_FAMILY.asOption(i)) != null + && parameters.get(HBaseAttribute.HBASE_QUALIFIER.asOption(i)) != null; i++) { HBaseCell cellModel = new HBaseCell(); - cellModel.setFamily(String.valueOf(parameters.remove(HbaseAttribute.HBASE_FAMILY.asOption(i)))); - cellModel.setQualifier(String.valueOf(parameters.remove(HbaseAttribute.HBASE_QUALIFIER.asOption(i)))); - cellModel.setValue(String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE.asOption(i)))); - if (parameters.containsKey(HbaseAttribute.HBASE_VALUE_TYPE.asOption(i))) { - String valueType = String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE_TYPE.asOption(i))); + cellModel.setFamily(String.valueOf(parameters.remove(HBaseAttribute.HBASE_FAMILY.asOption(i)))); + cellModel.setQualifier(String.valueOf(parameters.remove(HBaseAttribute.HBASE_QUALIFIER.asOption(i)))); + cellModel.setValue(String.valueOf(parameters.remove(HBaseAttribute.HBASE_VALUE.asOption(i)))); + if (parameters.containsKey(HBaseAttribute.HBASE_VALUE_TYPE.asOption(i))) { + String valueType = String.valueOf(parameters.remove(HBaseAttribute.HBASE_VALUE_TYPE.asOption(i))); if (valueType != null && !valueType.isEmpty()) { rowModel.setRowType(getCamelContext().getClassResolver().resolveClass(valueType)); } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java index 93f8bb0..b9ccfce 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java @@ -30,14 +30,14 @@ import org.apache.camel.component.hbase.model.HBaseData; import org.apache.camel.component.hbase.model.HBaseRow; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.ObjectHelper; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.util.Bytes; @@ -57,9 +57,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { } public void process(Exchange exchange) throws Exception { - HTableInterface table = endpoint.getTable(); - try { - + try (Table table = endpoint.getTable()) { updateHeaders(exchange); String operation = (String) exchange.getIn().getHeader(HBaseConstants.OPERATION); @@ -69,10 +67,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { HBaseData data = mappingStrategy.resolveModel(exchange.getIn()); - List<Put> putOperations = new LinkedList<Put>(); - List<Delete> deleteOperations = new LinkedList<Delete>(); - List<HBaseRow> getOperationResult = new LinkedList<HBaseRow>(); - List<HBaseRow> scanOperationResult = new LinkedList<HBaseRow>(); + List<Put> putOperations = new LinkedList<>(); + List<Delete> deleteOperations = new LinkedList<>(); + List<HBaseRow> getOperationResult = new LinkedList<>(); + List<HBaseRow> scanOperationResult = new LinkedList<>(); for (HBaseRow hRow : data.getRows()) { hRow.apply(rowModel); @@ -91,7 +89,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { //Check if we have something to add. if (!putOperations.isEmpty()) { table.put(putOperations); - table.flushCommits(); } else if (!deleteOperations.isEmpty()) { table.delete(deleteOperations); } else if (!getOperationResult.isEmpty()) { @@ -99,8 +96,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { } else if (!scanOperationResult.isEmpty()) { mappingStrategy.applyScanResults(exchange.getOut(), new HBaseData(scanOperationResult)); } - } finally { - table.close(); } } @@ -124,8 +119,11 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { ObjectHelper.notNull(family, "HBase column family", cell); ObjectHelper.notNull(column, "HBase column", cell); - put.add(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column), - endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, value)); + put.addColumn( + HBaseHelper.getHBaseFieldAsBytes(family), + HBaseHelper.getHBaseFieldAsBytes(column), + endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, value) + ); } return put; } @@ -134,9 +132,9 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { * Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs). * The result is <p>the most recent entry</p> for each column. */ - private HBaseRow getCells(HTableInterface table, HBaseRow hRow) throws Exception { + private HBaseRow getCells(Table table, HBaseRow hRow) throws Exception { HBaseRow resultRow = new HBaseRow(); - List<HBaseCell> resultCells = new LinkedList<HBaseCell>(); + List<HBaseCell> resultCells = new LinkedList<>(); ObjectHelper.notNull(hRow, "HBase row"); ObjectHelper.notNull(hRow.getId(), "HBase row id"); ObjectHelper.notNull(hRow.getCells(), "HBase cells"); @@ -166,11 +164,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { resultCell.setFamily(family); resultCell.setQualifier(column); - List<KeyValue> kvs = result.getColumn(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)); + List<Cell> kvs = result.getColumnCells(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)); if (kvs != null && !kvs.isEmpty()) { //Return the most recent entry. - resultCell - .setValue(endpoint.getCamelContext().getTypeConverter().convertTo(cellModel.getValueType(), kvs.get(0).getValue())); + resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(cellModel.getValueType(), kvs.get(0).getValue())); resultCell.setTimestamp(kvs.get(0).getTimestamp()); } resultCells.add(resultCell); @@ -192,9 +189,9 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { * Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs). * The result is <p>the most recent entry</p> for each column. */ - private List<HBaseRow> scanCells(HTableInterface table, HBaseRow model, String start, Integer maxRowScan, List<Filter> filters) + private List<HBaseRow> scanCells(Table table, HBaseRow model, String start, Integer maxRowScan, List<Filter> filters) throws Exception { - List<HBaseRow> rowSet = new LinkedList<HBaseRow>(); + List<HBaseRow> rowSet = new LinkedList<>(); HBaseRow startRow = new HBaseRow(model.getCells()); startRow.setId(start); @@ -237,16 +234,22 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { HBaseCell resultCell = new HBaseCell(); String family = modelCell.getFamily(); String column = modelCell.getQualifier(); - resultRow.setId(endpoint.getCamelContext().getTypeConverter().convertTo(model.getRowType(), result.getRow())); - resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(modelCell.getValueType(), - result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)))); + + resultRow.setId(endpoint.getCamelContext().getTypeConverter().convertTo( + model.getRowType(), + result.getRow()) + ); + resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo( + modelCell.getValueType(), + result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column))) + ); + resultCell.setFamily(modelCell.getFamily()); resultCell.setQualifier(modelCell.getQualifier()); - if (result.getColumnLatest(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)) != null) { - resultCell.setTimestamp( - result.getColumnLatest(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)) - .getTimestamp()); + Cell cell = result.getColumnLatestCell(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)); + if (cell != null) { + resultCell.setTimestamp(cell.getTimestamp()); } resultRow.getCells().add(resultCell); } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java index 18e3a63..0ea8d69 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseRemoveHandler.java @@ -16,7 +16,7 @@ */ package org.apache.camel.component.hbase; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; public interface HBaseRemoveHandler { @@ -25,5 +25,5 @@ public interface HBaseRemoveHandler { * The removal is not necessarily physical remove. * The implementation decides how a row can be considered as removed. */ - void remove(HTableInterface table, byte[] row); + void remove(Table table, byte[] row); } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HbaseAttribute.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HbaseAttribute.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HbaseAttribute.java deleted file mode 100644 index d099096..0000000 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HbaseAttribute.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.hbase; - -public enum HbaseAttribute { - - HBASE_ROW_ID("CamelHBaseRowId"), - HBASE_ROW_TYPE("CamelHBaseRowType"), - HBASE_MARKED_ROW_ID("CamelHBaseMarkedRowId"), - HBASE_FAMILY("CamelHBaseFamily"), - HBASE_QUALIFIER("CamelHBaseQualifier"), - HBASE_VALUE("CamelHBaseValue"), - HBASE_VALUE_TYPE("CamelHBaseValueType"); - - private final String value; - - private HbaseAttribute(String value) { - this.value = value; - } - - public String asHeader(int i) { - if (i > 1) { - return value + i; - } else { - return value; - } - } - - public String asHeader() { - return value; - } - - public String asOption() { - String normalizedValue = value.replaceAll("CamelHBase", ""); - return normalizedValue.substring(0, 1).toLowerCase() + normalizedValue.substring(1); - } - - public String asOption(int i) { - String option = asOption(); - if (i > 1) { - return option + i; - } else { - return option; - } - } - - @Override - public String toString() { - return value; - } -} http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java index 47ffba2..56b352d 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategy.java @@ -34,7 +34,7 @@ public interface CellMappingStrategy { HBaseData resolveModel(Message message); /** - * Applies the KeyValues of a get opration to the {@link Exchange}. + * Applies the KeyValues of a get operation to the {@link Exchange}. * * @param message The message that will be applied the Get result. * @param data The rows that will be applied to the message. http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java index e51e8c3..e795f22 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/CellMappingStrategyFactory.java @@ -31,11 +31,11 @@ public class CellMappingStrategyFactory { public static final String BODY = "body"; private static final Logger LOG = LoggerFactory.getLogger(CellMappingStrategyFactory.class); - private static final Map<String, CellMappingStrategy> DEFAULT_STRATIGIES = new HashMap<String, CellMappingStrategy>(); + private static final Map<String, CellMappingStrategy> DEFAULT_STRATEGIES = new HashMap<String, CellMappingStrategy>(); public CellMappingStrategyFactory() { - DEFAULT_STRATIGIES.put(HEADER, new HeaderMappingStrategy()); - DEFAULT_STRATIGIES.put(BODY, new BodyMappingStrategy()); + DEFAULT_STRATEGIES.put(HEADER, new HeaderMappingStrategy()); + DEFAULT_STRATEGIES.put(BODY, new BodyMappingStrategy()); } public CellMappingStrategy getStrategy(Message message) { @@ -43,7 +43,7 @@ public class CellMappingStrategyFactory { //Check if strategy has been explicitly set. if (message.getHeader(STRATEGY) != null) { - strategy = DEFAULT_STRATIGIES.get(message.getHeader(STRATEGY, String.class)); + strategy = DEFAULT_STRATEGIES.get(message.getHeader(STRATEGY, String.class)); } if (strategy == null && message.getHeader(STRATEGY_CLASS_NAME) != null) { @@ -55,7 +55,7 @@ public class CellMappingStrategyFactory { } //Fallback to the default strategy. - return DEFAULT_STRATIGIES.get(HEADER); + return DEFAULT_STRATEGIES.get(HEADER); } private CellMappingStrategy loadStrategyFromClassName(String strategyClassName) { http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java index d8c54bd..947f973 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java @@ -21,7 +21,7 @@ import java.util.Map; import java.util.Set; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.component.hbase.HbaseAttribute; +import org.apache.camel.component.hbase.HBaseAttribute; import org.apache.camel.component.hbase.model.HBaseCell; import org.apache.camel.component.hbase.model.HBaseData; import org.apache.camel.component.hbase.model.HBaseRow; @@ -47,14 +47,14 @@ public class HeaderMappingStrategy implements CellMappingStrategy { HBaseCell hCell = new HBaseCell(); if (message != null) { - Object id = message.getHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(index)); - String rowClassName = message.getHeader(HbaseAttribute.HBASE_ROW_TYPE.asHeader(index), String.class); + Object id = message.getHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(index)); + String rowClassName = message.getHeader(HBaseAttribute.HBASE_ROW_TYPE.asHeader(index), String.class); Class<?> rowClass = rowClassName == null || rowClassName.isEmpty() ? String.class : message.getExchange().getContext().getClassResolver().resolveClass(rowClassName); - String columnFamily = (String) message.getHeader(HbaseAttribute.HBASE_FAMILY.asHeader(index)); - String columnName = (String) message.getHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(index)); - Object value = message.getHeader(HbaseAttribute.HBASE_VALUE.asHeader(index)); + String columnFamily = (String) message.getHeader(HBaseAttribute.HBASE_FAMILY.asHeader(index)); + String columnName = (String) message.getHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(index)); + Object value = message.getHeader(HBaseAttribute.HBASE_VALUE.asHeader(index)); - String valueClassName = message.getHeader(HbaseAttribute.HBASE_VALUE_TYPE.asHeader(index), String.class); + String valueClassName = message.getHeader(HBaseAttribute.HBASE_VALUE_TYPE.asHeader(index), String.class); Class<?> valueClass = valueClassName == null || valueClassName.isEmpty() ? String.class : message.getExchange().getContext().getClassResolver().resolveClass(valueClassName); //Id can be accepted as null when using get, scan etc. @@ -84,7 +84,7 @@ public class HeaderMappingStrategy implements CellMappingStrategy { int index = 1; HBaseData data = new HBaseData(); //We use a LinkedHashMap to preserve the order. - Map<Object, HBaseRow> rows = new LinkedHashMap<Object, HBaseRow>(); + Map<Object, HBaseRow> rows = new LinkedHashMap<>(); HBaseRow hRow = new HBaseRow(); while (hRow != null) { hRow = resolveRow(message, index++); @@ -116,7 +116,7 @@ public class HeaderMappingStrategy implements CellMappingStrategy { if (hRow.getId() != null) { Set<HBaseCell> cells = hRow.getCells(); for (HBaseCell cell : cells) { - message.setHeader(HbaseAttribute.HBASE_VALUE.asHeader(index++), getValueForColumn(cells, cell.getFamily(), cell.getQualifier())); + message.setHeader(HBaseAttribute.HBASE_VALUE.asHeader(index++), getValueForColumn(cells, cell.getFamily(), cell.getQualifier())); } } } @@ -136,10 +136,10 @@ public class HeaderMappingStrategy implements CellMappingStrategy { for (HBaseRow hRow : data.getRows()) { Set<HBaseCell> cells = hRow.getCells(); for (HBaseCell cell : cells) { - message.setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(index), hRow.getId()); - message.setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(index), cell.getFamily()); - message.setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(index), cell.getQualifier()); - message.setHeader(HbaseAttribute.HBASE_VALUE.asHeader(index), cell.getValue()); + message.setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(index), hRow.getId()); + message.setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(index), cell.getFamily()); + message.setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(index), cell.getQualifier()); + message.setHeader(HBaseAttribute.HBASE_VALUE.asHeader(index), cell.getValue()); } index++; } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java index 05900a1..92d0198 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java @@ -16,23 +16,22 @@ */ package org.apache.camel.component.hbase.processor.idempotent; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.ObjectOutputStream; import org.apache.camel.component.hbase.HBaseHelper; import org.apache.camel.spi.IdempotentRepository; import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.IOHelper; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,15 +41,18 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot private final String tableName; private final String family; - private final String qualifer; - private final HTable table; + private final String qualifier; + private final Configuration configuration; + private Connection connection; + private Table table; public HBaseIdempotentRepository(Configuration configuration, String tableName, String family, String qualifier) throws IOException { this.tableName = tableName; this.family = family; - this.qualifer = qualifier; - //In the case of idempotent repository we do not want to catch exceptions related to HTable. - this.table = new HTable(configuration, tableName); + this.qualifier = qualifier; + this.configuration = configuration; + this.connection = null; + this.table = null; } @Override @@ -60,11 +62,10 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot if (contains(o)) { return false; } - byte[] b = toBytes(o); + byte[] b = HBaseHelper.toBytes(o); Put put = new Put(b); - put.add(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifer), b); + put.addColumn(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifier), b); table.put(put); - table.flushCommits(); return true; } } catch (Exception e) { @@ -76,9 +77,9 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot @Override public boolean contains(Object o) { try { - byte[] b = toBytes(o); + byte[] b = HBaseHelper.toBytes(o); Get get = new Get(b); - get.addColumn(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifer)); + get.addColumn(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(qualifier)); return table.exists(get); } catch (Exception e) { LOG.warn("Error reading object {} from HBase repository.", o); @@ -89,7 +90,7 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot @Override public boolean remove(Object o) { try { - byte[] b = toBytes(o); + byte[] b = HBaseHelper.toBytes(o); if (table.exists(new Get(b))) { Delete delete = new Delete(b); table.delete(delete); @@ -125,43 +126,18 @@ public class HBaseIdempotentRepository extends ServiceSupport implements Idempot @Override protected void doStart() throws Exception { - // noop + this.connection = ConnectionFactory.createConnection(configuration); + this.table = this.connection.getTable(TableName.valueOf(tableName)); } @Override protected void doStop() throws Exception { - // noop - } + if (table != null) { + table.close(); + } - private byte[] toBytes(Object obj) { - if (obj instanceof byte[]) { - return (byte[]) obj; - } else if (obj instanceof Byte) { - return Bytes.toBytes((Byte) obj); - } else if (obj instanceof Short) { - return Bytes.toBytes((Short) obj); - } else if (obj instanceof Integer) { - return Bytes.toBytes((Integer) obj); - } else if (obj instanceof Long) { - return Bytes.toBytes((Long) obj); - } else if (obj instanceof Double) { - return Bytes.toBytes((Double) obj); - } else if (obj instanceof String) { - return Bytes.toBytes((String) obj); - } else { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = null; - try { - oos = new ObjectOutputStream(baos); - oos.writeObject(obj); - return baos.toByteArray(); - } catch (IOException e) { - LOG.warn("Error while serializing object. Null will be used.", e); - return null; - } finally { - IOHelper.close(oos); - IOHelper.close(baos); - } + if (connection != null) { + connection.close(); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java index 5cc0286..0079ff3 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java @@ -50,15 +50,15 @@ public class CamelHBaseFilterTest extends CamelHBaseTestSupport { Endpoint endpoint = context.getEndpoint("direct:scan"); Exchange exchange = endpoint.createExchange(ExchangePattern.InOut); - exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]); Exchange resp = template.send(endpoint, exchange); Message out = resp.getOut(); assertTrue("two first keys returned", - out.getHeaders().containsValue(body[0][0][0]) - && out.getHeaders().containsValue(body[1][0][0]) - && !out.getHeaders().containsValue(body[2][0][0])); + out.getHeaders().containsValue(body[0][0][0]) + && out.getHeaders().containsValue(body[1][0][0]) + && !out.getHeaders().containsValue(body[2][0][0])); } } @@ -73,7 +73,6 @@ public class CamelHBaseFilterTest extends CamelHBaseTestSupport { public void configure() { from("direct:start") .to("hbase://" + PERSON_TABLE); - from("direct:scan") .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2"); } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java index 937ef22..343852d 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java @@ -25,8 +25,11 @@ import org.apache.camel.util.IOHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -49,7 +52,6 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport { protected String[] key = {"1", "2", "3"}; protected final String[] family = {"info", "birthdate", "address"}; - //comlumn[family][column] protected final String[][] column = { {"id", "firstName", "lastName"}, {"day", "month", "year"}, @@ -117,7 +119,9 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport { protected void putMultipleRows() throws IOException { Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration(); - HTable table = new HTable(configuration, PERSON_TABLE.getBytes()); + Connection connection = ConnectionFactory.createConnection(configuration); + Table table = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes())); + for (int r = 0; r < key.length; r++) { Put put = new Put(key[r].getBytes()); put.add(family[0].getBytes(), column[0][0].getBytes(), body[r][0][0].getBytes()); http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java index deda182..44e2461 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java @@ -31,13 +31,12 @@ public class HBaseConsumerTest extends CamelHBaseTestSupport { MockEndpoint mockEndpoint = getMockEndpoint("mock:result"); mockEndpoint.expectedMessageCount(3); - Map<String, Object> headers = new HashMap<String, Object>(); - + Map<String, Object> headers = new HashMap<>(); for (int row = 0; row < key.length; row++) { - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]); - headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]); - headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]); - headers.put(HbaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]); } headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT); @@ -58,7 +57,6 @@ public class HBaseConsumerTest extends CamelHBaseTestSupport { public void configure() { from("direct:start") .to("hbase://" + PERSON_TABLE); - from("hbase://" + PERSON_TABLE) .to("mock:result"); } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java index 9813744..3535ad4 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java @@ -41,20 +41,20 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport { if (systemReady) { ProducerTemplate template = context.createProducerTemplate(); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); - headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), INFO_FAMILY); - headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]); - headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0]); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), INFO_FAMILY); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0]); - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]); - headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(2), INFO_FAMILY); - headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]); - headers.put(HbaseAttribute.HBASE_VALUE.asHeader(2), body[1]); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(2), INFO_FAMILY); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(2), body[1]); - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]); - headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(3), INFO_FAMILY); - headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]); - headers.put(HbaseAttribute.HBASE_VALUE.asHeader(3), body[2]); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(3), INFO_FAMILY); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(3), body[2]); headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT); @@ -99,7 +99,6 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport { public void configure() { from("direct:start") .to("hbase://" + PERSON_TABLE); - from("direct:scan") .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&row.family=family1&row.qualifier=column1"); } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java index fa3f229..cac4539 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java @@ -26,9 +26,13 @@ import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.util.IOHelper; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.junit.Test; public class HBaseProducerTest extends CamelHBaseTestSupport { @@ -36,16 +40,18 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { @Test public void testPut() throws Exception { if (systemReady) { - Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); - headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]); - headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); - headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]); + Map<String, Object> headers = new HashMap<>(); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), body[0][0][0]); headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT); template.sendBodyAndHeaders("direct:start", null, headers); Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration(); - HTable table = new HTable(configuration, PERSON_TABLE.getBytes()); + Connection connection = ConnectionFactory.createConnection(configuration); + Table table = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes())); + Get get = new Get(key[0].getBytes()); get.addColumn(family[0].getBytes(), column[0][0].getBytes()); @@ -63,30 +69,30 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { if (systemReady) { Exchange resp = template.request("direct:start", new Processor() { public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET); } }); - assertEquals(body[0][0][0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader())); + assertEquals(body[0][0][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader())); } } @Test public void testPutAndGetWithModel() throws Exception { if (systemReady) { - Map<String, Object> headers = new HashMap<String, Object>(); + Map<String, Object> headers = new HashMap<>(); headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT); int index = 1; for (int row = 0; row < key.length; row++) { for (int fam = 0; fam < family.length; fam++) { for (int col = 0; col < column[fam].length; col++) { - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(index), key[row]); - headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(index), family[fam]); - headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(index), column[fam][col]); - headers.put(HbaseAttribute.HBASE_VALUE.asHeader(index++), body[row][fam][col]); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(index), key[row]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(index), family[fam]); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(index), column[fam][col]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(index++), body[row][fam][col]); } } } @@ -95,26 +101,26 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { Exchange resp = template.request("direct:start-with-model", new Processor() { public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET); } }); - assertEquals(body[0][0][1], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader())); - assertEquals(body[0][1][2], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2))); + assertEquals(body[0][0][1], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader())); + assertEquals(body[0][1][2], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2))); } } @Test public void testPutMultiRows() throws Exception { if (systemReady) { - Map<String, Object> headers = new HashMap<String, Object>(); + Map<String, Object> headers = new HashMap<>(); headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT); for (int row = 0; row < key.length; row++) { - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]); - headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]); - headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]); - headers.put(HbaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(row + 1), body[row][0][0]); } template.sendBodyAndHeaders("direct:start", null, headers); @@ -143,15 +149,15 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET); for (int row = 0; row < key.length; row++) { - exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(row + 1), key[row]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(row + 1), family[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(row + 1), column[0][0]); } } }); for (int row = 0; row < key.length; row++) { - assertEquals(body[row][0][0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(row + 1))); + assertEquals(body[row][0][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(row + 1))); } } } @@ -159,19 +165,20 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { @Test public void testPutMultiColumns() throws Exception { if (systemReady) { - Map<String, Object> headers = new HashMap<String, Object>(); + Map<String, Object> headers = new HashMap<>(); headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT); for (int col = 0; col < column[0].length; col++) { - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]); - headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]); - headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]); - headers.put(HbaseAttribute.HBASE_VALUE.asHeader(col + 1), body[0][col][0]); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(col + 1), body[0][col][0]); } template.sendBodyAndHeaders("direct:start", null, headers); Configuration configuration = hbaseUtil.getHBaseAdmin().getConfiguration(); - HTable bar = new HTable(configuration, PERSON_TABLE.getBytes()); + Connection connection = ConnectionFactory.createConnection(configuration); + Table bar = connection.getTable(TableName.valueOf(PERSON_TABLE.getBytes())); for (int col = 0; col < column[0].length; col++) { Get get = new Get(key[0].getBytes()); @@ -193,15 +200,15 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET); for (int col = 0; col < column[0].length; col++) { - exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(col + 1), key[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(col + 1), family[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(col + 1), column[0][col]); } } }); for (int col = 0; col < column[0].length; col++) { - assertEquals(body[0][col][0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(col + 1))); + assertEquals(body[0][col][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(col + 1))); } } } @@ -210,25 +217,25 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { public void testPutAndGetAndDeleteMultiRows() throws Exception { testPutMultiRows(); if (systemReady) { - Map<String, Object> headers = new HashMap<String, Object>(); + Map<String, Object> headers = new HashMap<>(); headers.put(HBaseConstants.OPERATION, HBaseConstants.DELETE); - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); template.sendBodyAndHeaders("direct:start", null, headers); Exchange resp = template.request("direct:start", new Processor() { public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader(HBaseConstants.OPERATION, HBaseConstants.GET); - exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(2), family[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0][0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(), key[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(2), family[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0][0]); } }); - assertEquals(null, resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader())); - assertEquals(body[1][0][0], resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2))); + assertEquals(null, resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader())); + assertEquals(body[1][0][0], resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2))); } } @@ -238,15 +245,15 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { if (systemReady) { Exchange resp = template.request("direct:maxScan", new Processor() { public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); } }); - Object result1 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(1)); - Object result2 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2)); + Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1)); + Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)); // as we use maxResults=2 we only get 2 results back - Object result3 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(3)); + Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3)); assertNull("Should only get 2 results back", result3); List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0]); @@ -260,14 +267,14 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { if (systemReady) { Exchange resp = template.request("direct:scan", new Processor() { public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]); - exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); } }); - Object result1 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(1)); - Object result2 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2)); - Object result3 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(3)); + Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1)); + Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)); + Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3)); List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]); assertTrue(bodies.contains(result1) && bodies.contains(result2) && bodies.contains(result3)); @@ -277,12 +284,12 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { @Test public void testPutAndScan() throws Exception { if (systemReady) { - Map<String, Object> headers = new HashMap<String, Object>(); + Map<String, Object> headers = new HashMap<>(); headers.put(HBaseConstants.OPERATION, HBaseConstants.PUT); - headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), "1"); - headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), "info"); - headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), "id"); - headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), "3"); + headers.put(HBaseAttribute.HBASE_ROW_ID.asHeader(), "1"); + headers.put(HBaseAttribute.HBASE_FAMILY.asHeader(), "info"); + headers.put(HBaseAttribute.HBASE_QUALIFIER.asHeader(), "id"); + headers.put(HBaseAttribute.HBASE_VALUE.asHeader(), "3"); template.sendBodyAndHeaders("direct:start", null, headers); @@ -298,15 +305,15 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { Exchange resp = template.request("direct:scan", new Processor() { public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), "info"); - exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), "id"); + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), "info"); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), "id"); } }); - assertEquals("1", resp.getOut().getHeader(HbaseAttribute.HBASE_ROW_ID.asHeader())); - assertEquals("info", resp.getOut().getHeader(HbaseAttribute.HBASE_FAMILY.asHeader())); - assertEquals("id", resp.getOut().getHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader())); - assertEquals("3", resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader())); + assertEquals("1", resp.getOut().getHeader(HBaseAttribute.HBASE_ROW_ID.asHeader())); + assertEquals("info", resp.getOut().getHeader(HBaseAttribute.HBASE_FAMILY.asHeader())); + assertEquals("id", resp.getOut().getHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader())); + assertEquals("3", resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader())); } } @@ -321,13 +328,10 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { public void configure() { from("direct:start") .to("hbase://" + PERSON_TABLE); - from("direct:start-with-model") .to("hbase://" + PERSON_TABLE + "?row.family=info&row.qualifier=firstName&row.family2=birthdate&row.qualifier2=year"); - from("direct:scan") - .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN); - + .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN); from("direct:maxScan") .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2"); } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java index 1baf1de..cea9a2f 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java @@ -24,7 +24,6 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.spi.IdempotentRepository; import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.client.HTable; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -32,7 +31,6 @@ import org.junit.Test; public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport { IdempotentRepository<Object> repository; - HTable table; private String key01 = "123"; private String key02 = "456"; @@ -46,7 +44,6 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport { //Ignore if table exists } this.repository = new HBaseIdempotentRepository(hbaseUtil.getConfiguration(), PERSON_TABLE, INFO_FAMILY, "mycolumn"); - table = new HTable(hbaseUtil.getConfiguration(), PERSON_TABLE); super.setUp(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/components/camel-hbase/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/resources/log4j.properties b/components/camel-hbase/src/test/resources/log4j.properties index 306ab49..534faf4 100644 --- a/components/camel-hbase/src/test/resources/log4j.properties +++ b/components/camel-hbase/src/test/resources/log4j.properties @@ -25,7 +25,6 @@ log4j.logger.org.apache.camel.component.hbase=WARN, out #log4j.logger.org.apache.camel=DEBUG #log4j.logger.org.apache.camel.component.hbase=TRACE - # CONSOLE appender not used by default log4j.appender.out=org.apache.log4j.ConsoleAppender log4j.appender.out.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/camel/blob/bb3a75ef/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index d09fa6d..b5bcac2 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -224,7 +224,7 @@ <hawtdb-version>1.6</hawtdb-version> <hawtdispatch-version>1.21</hawtdispatch-version> <hazelcast-version>3.6</hazelcast-version> - <hbase-version>1.1.1</hbase-version> + <hbase-version>1.1.3</hbase-version> <hbase-bundle-version>1.1.1_1</hbase-bundle-version> <hibernate-validator-version>5.2.4.Final</hibernate-validator-version> <!-- Spring 3.2.x and 4.0.x still stick to JPA 2.0. Hibernate 4.3.x upgraded to JPA 2.1. -->