CAMEL-8268: Various camel-hbases fixes and improvements. Thanks to Emilien for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9edcfbc1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9edcfbc1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9edcfbc1 Branch: refs/heads/master Commit: 9edcfbc1bfeeeb3625fb91815f2a8a117679fd16 Parents: e7c6529 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Feb 12 07:15:20 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Feb 12 07:15:20 2015 +0100 ---------------------------------------------------------------------- .../camel/component/hbase/HBaseConstants.java | 3 +- .../camel/component/hbase/HBaseEndpoint.java | 4 +- .../camel/component/hbase/HBaseProducer.java | 69 +++++++++++++------- .../ModelAwareRowPrefixMatchingFilter.java | 50 ++++++++++++++ .../hbase/mapping/HeaderMappingStrategy.java | 2 + .../camel/component/hbase/model/HBaseCell.java | 27 +++++++- .../camel/component/hbase/model/HBaseRow.java | 10 +++ .../component/hbase/HBaseProducerTest.java | 25 +++++++ 8 files changed, 160 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java index c91a4d6..42ae4fc 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java @@ -26,5 +26,6 @@ public interface HBaseConstants { String DELETE = "CamelHBaseDelete"; String HBASE_MAX_SCAN_RESULTS = "CamelHBaseMaxScanResults"; - + + String FROM_ROW = "CamelHBaseStartRow"; } http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java index ff547ef..11587fb 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java @@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.filter.Filter; public class HBaseEndpoint extends DefaultEndpoint { private Configuration configuration; - @UriPath - private final String tableName; private final HTablePool tablePool; private HBaseAdmin admin; + @UriPath + private final String tableName; //Operation properties. @UriParam(defaultValue = "100") private int maxResults = 100; http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java index b9c5ebb..053309f 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.hbase; import java.util.LinkedList; import java.util.List; import java.util.Set; + import org.apache.camel.Exchange; import org.apache.camel.ServicePoolAware; import org.apache.camel.component.hbase.filters.ModelAwareFilter; @@ -29,7 +30,6 @@ import org.apache.camel.component.hbase.model.HBaseData; import org.apache.camel.component.hbase.model.HBaseRow; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.ObjectHelper; - import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.util.Bytes; /** * The HBase producer. @@ -66,6 +67,9 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { updateHeaders(exchange); String operation = (String) exchange.getIn().getHeader(HBaseConstants.OPERATION); + + Integer maxScanResult = exchange.getIn().getHeader(HBaseConstants.HBASE_MAX_SCAN_RESULTS, Integer.class); + String fromRowId = (String) exchange.getIn().getHeader(HBaseConstants.FROM_ROW); CellMappingStrategy mappingStrategy = endpoint.getCellMappingStrategyFactory().getStrategy(exchange.getIn()); HBaseData data = mappingStrategy.resolveModel(exchange.getIn()); @@ -85,7 +89,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { } else if (HBaseConstants.DELETE.equals(operation)) { deleteOperations.add(createDeleteRow(hRow)); } else if (HBaseConstants.SCAN.equals(operation)) { - scanOperationResult = scanCells(table, hRow, endpoint.getFilters()); + scanOperationResult = scanCells(table, hRow, fromRowId, maxScanResult, endpoint.getFilters()); } } @@ -131,13 +135,8 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { } /** - * Perfoms an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs). + * Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs). * The result is <p>the most recent entry</p> for each column. - * - * @param table - * @param hRow - * @return - * @throws Exception */ private HBaseRow getCells(HTableInterface table, HBaseRow hRow) throws Exception { HBaseRow resultRow = new HBaseRow(); @@ -160,6 +159,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { Result result = table.get(get); + if (!result.isEmpty()) { + resultRow.setTimestamp(result.raw()[0].getTimestamp()); + } + for (HBaseCell cellModel : cellModels) { HBaseCell resultCell = new HBaseCell(); String family = cellModel.getFamily(); @@ -171,6 +174,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { if (kvs != null && !kvs.isEmpty()) { //Return the most recent entry. resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(cellModel.getValueType(), kvs.get(0).getValue())); + resultCell.setTimestamp(kvs.get(0).getTimestamp()); } resultCells.add(resultCell); resultRow.getCells().add(resultCell); @@ -180,9 +184,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { /** * Creates an HBase {@link Delete} on a specific row, using a collection of values (family/column/value pairs). - * - * @param hRow - * @throws Exception */ private Delete createDeleteRow(HBaseRow hRow) throws Exception { ObjectHelper.notNull(hRow, "HBase row"); @@ -191,25 +192,37 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { } /** - * Perfoms an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs). + * Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs). * The result is <p>the most recent entry</p> for each column. - * - * @param table - * @param model - * @return - * @throws Exception */ - private List<HBaseRow> scanCells(HTableInterface table, HBaseRow model, List<Filter> filters) throws Exception { + private List<HBaseRow> scanCells(HTableInterface table, HBaseRow model, String start, Integer maxRowScan, List<Filter> filters) throws Exception { List<HBaseRow> rowSet = new LinkedList<HBaseRow>(); - Scan scan = new Scan(); + + HBaseRow startRow = new HBaseRow(model.getCells()); + startRow.setId(start); + + Scan scan; + if (start != null) { + scan = new Scan(Bytes.toBytes(start)); + } else { + scan = new Scan(); + } + + // need to clone the filters as they are not thread safe to use if (filters != null && !filters.isEmpty()) { + List<Filter> clonedFilters = new LinkedList<Filter>(); for (Filter filter : filters) { if (ModelAwareFilter.class.isAssignableFrom(filter.getClass())) { - ((ModelAwareFilter<?>) filter).apply(endpoint.getCamelContext(), model); + Object clone = endpoint.getCamelContext().getInjector().newInstance(filter.getClass()); + if (clone instanceof ModelAwareFilter) { + ((ModelAwareFilter<?>) clone).apply(endpoint.getCamelContext(), model); + clonedFilters.add((ModelAwareFilter<?>) clone); + } } } - scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, filters)); + scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, clonedFilters)); } + Set<HBaseCell> cellModels = model.getCells(); for (HBaseCell cellModel : cellModels) { String family = cellModel.getFamily(); @@ -221,10 +234,14 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { } ResultScanner resultScanner = table.getScanner(scan); + int count = 0; Result result = resultScanner.next(); - while (result != null) { + + while (result != null && count < maxRowScan) { HBaseRow resultRow = new HBaseRow(); resultRow.setId(endpoint.getCamelContext().getTypeConverter().convertTo(model.getRowType(), result.getRow())); + + resultRow.setTimestamp(result.raw()[0].getTimestamp()); cellModels = model.getCells(); for (HBaseCell modelCell : cellModels) { HBaseCell resultCell = new HBaseCell(); @@ -235,10 +252,14 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)))); resultCell.setFamily(modelCell.getFamily()); resultCell.setQualifier(modelCell.getQualifier()); + + if (result.getColumnLatest(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)) != null) { + resultCell.setTimestamp(result.getColumnLatest(HBaseHelper.getHBaseFieldAsBytes(family), HBaseHelper.getHBaseFieldAsBytes(column)).getTimestamp()); + } resultRow.getCells().add(resultCell); - rowSet.add(resultRow); } - + rowSet.add(resultRow); + count++; result = resultScanner.next(); } return rowSet; http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareRowPrefixMatchingFilter.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareRowPrefixMatchingFilter.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareRowPrefixMatchingFilter.java new file mode 100644 index 0000000..84be756 --- /dev/null +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareRowPrefixMatchingFilter.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hbase.filters; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.hbase.model.HBaseRow; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PrefixFilter; + +/** + * A {@link FilterList} that contains multiple {@link PrefixFilter}s one per column that is part of the model. + */ +public class ModelAwareRowPrefixMatchingFilter extends FilterList implements ModelAwareFilter<FilterList> { + + /** + * Writable constructor, do not use. + */ + public ModelAwareRowPrefixMatchingFilter() { + } + + /** + * Applies the message to {@link org.apache.hadoop.hbase.filter.Filter} to + * context. + */ + @Override + public void apply(CamelContext context, HBaseRow rowModel) { + getFilters().clear(); + if (rowModel != null) { + if (rowModel.getId() != null) { + byte[] value = context.getTypeConverter().convertTo(byte[].class, rowModel.getId()); + PrefixFilter rowPrefixFilter = new PrefixFilter(value); + addFilter(rowPrefixFilter); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java index 7781de8..d8c54bd 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java @@ -106,6 +106,7 @@ public class HeaderMappingStrategy implements CellMappingStrategy { * Applies the cells to the {@link org.apache.camel.Exchange}. */ public void applyGetResults(Message message, HBaseData data) { + message.setHeaders(message.getExchange().getIn().getHeaders()); int index = 1; if (data == null || data.getRows() == null) { return; @@ -126,6 +127,7 @@ public class HeaderMappingStrategy implements CellMappingStrategy { * Applies the cells to the {@link org.apache.camel.Exchange}. */ public void applyScanResults(Message message, HBaseData data) { + message.setHeaders(message.getExchange().getIn().getHeaders()); int index = 1; if (data == null || data.getRows() == null) { return; http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java index 54e5a1b..14f4104 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java @@ -28,6 +28,7 @@ public class HBaseCell { private String family; private String qualifier; private Object value; + private Long timestamp; //The value type can be optionally specified for Gets and Scan, to specify how the byte[] read will be converted. private Class<?> valueType = String.class; @@ -73,6 +74,14 @@ public class HBaseCell { this.valueType = valueType; } + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -82,12 +91,21 @@ public class HBaseCell { return false; } - HBaseCell cell = (HBaseCell) o; + HBaseCell hBaseCell = (HBaseCell) o; - if (family != null ? !family.equals(cell.family) : cell.family != null) { + if (family != null ? !family.equals(hBaseCell.family) : hBaseCell.family != null) { + return false; + } + if (qualifier != null ? !qualifier.equals(hBaseCell.qualifier) : hBaseCell.qualifier != null) { + return false; + } + if (timestamp != null ? !timestamp.equals(hBaseCell.timestamp) : hBaseCell.timestamp != null) { + return false; + } + if (value != null ? !value.equals(hBaseCell.value) : hBaseCell.value != null) { return false; } - if (qualifier != null ? !qualifier.equals(cell.qualifier) : cell.qualifier != null) { + if (valueType != null ? !valueType.equals(hBaseCell.valueType) : hBaseCell.valueType != null) { return false; } @@ -98,6 +116,9 @@ public class HBaseCell { public int hashCode() { int result = family != null ? family.hashCode() : 0; result = 31 * result + (qualifier != null ? qualifier.hashCode() : 0); + result = 31 * result + (value != null ? value.hashCode() : 0); + result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0); + result = 31 * result + (valueType != null ? valueType.hashCode() : 0); return result; } } http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java index f267bcf..daf6e5f 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java @@ -29,6 +29,8 @@ public class HBaseRow implements Cloneable { private Class<?> rowType = String.class; private Set<HBaseCell> cells; + private long timestamp; + public HBaseRow() { this(new LinkedHashSet<HBaseCell>()); } @@ -70,6 +72,14 @@ public class HBaseRow implements Cloneable { return cells.size(); } + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + public void apply(HBaseRow modelRow) { if (modelRow != null) { if (rowType == null && modelRow.getRowType() != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java index 2efd3b7..9e71c65 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java @@ -233,6 +233,28 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { } @Test + public void testPutMultiRowsAndMaxScan() throws Exception { + testPutMultiRows(); + if (systemReady) { + Exchange resp = template.request("direct:maxScan", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]); + exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); + } + }); + + Object result1 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(1)); + Object result2 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2)); + // as we use maxResults=2 we only get 2 results back + Object result3 = resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(3)); + assertNull("Should only get 2 results back", result3); + + List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0]); + assertTrue(bodies.contains(result1) && bodies.contains(result2)); + } + } + + @Test public void testPutMultiRowsAndScan() throws Exception { testPutMultiRows(); if (systemReady) { @@ -304,6 +326,9 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { .to("hbase://" + PERSON_TABLE + "?family=info&qualifier=firstName&family2=birthdate&qualifier2=year"); from("direct:scan") + .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN); + + from("direct:maxScan") .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2"); } };