CAMEL-8268: Various camel-hbases fixes and improvements. Thanks to Emilien for 
the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9edcfbc1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9edcfbc1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9edcfbc1

Branch: refs/heads/master
Commit: 9edcfbc1bfeeeb3625fb91815f2a8a117679fd16
Parents: e7c6529
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Feb 12 07:15:20 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Feb 12 07:15:20 2015 +0100

----------------------------------------------------------------------
 .../camel/component/hbase/HBaseConstants.java   |  3 +-
 .../camel/component/hbase/HBaseEndpoint.java    |  4 +-
 .../camel/component/hbase/HBaseProducer.java    | 69 +++++++++++++-------
 .../ModelAwareRowPrefixMatchingFilter.java      | 50 ++++++++++++++
 .../hbase/mapping/HeaderMappingStrategy.java    |  2 +
 .../camel/component/hbase/model/HBaseCell.java  | 27 +++++++-
 .../camel/component/hbase/model/HBaseRow.java   | 10 +++
 .../component/hbase/HBaseProducerTest.java      | 25 +++++++
 8 files changed, 160 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
index c91a4d6..42ae4fc 100644
--- 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java
@@ -26,5 +26,6 @@ public interface HBaseConstants {
     String DELETE = "CamelHBaseDelete";
 
     String HBASE_MAX_SCAN_RESULTS = "CamelHBaseMaxScanResults";
-
+    
+    String FROM_ROW = "CamelHBaseStartRow";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
index ff547ef..11587fb 100644
--- 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
@@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.filter.Filter;
 public class HBaseEndpoint extends DefaultEndpoint {
 
     private Configuration configuration;
-    @UriPath
-    private final String tableName;
     private final HTablePool tablePool;
     private HBaseAdmin admin;
 
+    @UriPath
+    private final String tableName;
     //Operation properties.
     @UriParam(defaultValue = "100")
     private int maxResults = 100;

http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
index b9c5ebb..053309f 100644
--- 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.hbase;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.component.hbase.filters.ModelAwareFilter;
@@ -29,7 +30,6 @@ import org.apache.camel.component.hbase.model.HBaseData;
 import org.apache.camel.component.hbase.model.HBaseRow;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
-
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * The HBase producer.
@@ -66,6 +67,9 @@ public class HBaseProducer extends DefaultProducer implements 
ServicePoolAware {
 
             updateHeaders(exchange);
             String operation = (String) 
exchange.getIn().getHeader(HBaseConstants.OPERATION);
+
+            Integer maxScanResult = 
exchange.getIn().getHeader(HBaseConstants.HBASE_MAX_SCAN_RESULTS, 
Integer.class);
+            String fromRowId = (String) 
exchange.getIn().getHeader(HBaseConstants.FROM_ROW);
             CellMappingStrategy mappingStrategy = 
endpoint.getCellMappingStrategyFactory().getStrategy(exchange.getIn());
 
             HBaseData data = mappingStrategy.resolveModel(exchange.getIn());
@@ -85,7 +89,7 @@ public class HBaseProducer extends DefaultProducer implements 
ServicePoolAware {
                 } else if (HBaseConstants.DELETE.equals(operation)) {
                     deleteOperations.add(createDeleteRow(hRow));
                 } else if (HBaseConstants.SCAN.equals(operation)) {
-                    scanOperationResult = scanCells(table, hRow, 
endpoint.getFilters());
+                    scanOperationResult = scanCells(table, hRow, fromRowId, 
maxScanResult, endpoint.getFilters());
                 }
             }
 
@@ -131,13 +135,8 @@ public class HBaseProducer extends DefaultProducer 
implements ServicePoolAware {
     }
 
     /**
-     * Perfoms an HBase {@link Get} on a specific row, using a collection of 
values (family/column/value pairs).
+     * Performs an HBase {@link Get} on a specific row, using a collection of 
values (family/column/value pairs).
      * The result is <p>the most recent entry</p> for each column.
-     *
-     * @param table
-     * @param hRow
-     * @return
-     * @throws Exception
      */
     private HBaseRow getCells(HTableInterface table, HBaseRow hRow) throws 
Exception {
         HBaseRow resultRow = new HBaseRow();
@@ -160,6 +159,10 @@ public class HBaseProducer extends DefaultProducer 
implements ServicePoolAware {
 
         Result result = table.get(get);
 
+        if (!result.isEmpty()) {
+            resultRow.setTimestamp(result.raw()[0].getTimestamp());
+        }
+
         for (HBaseCell cellModel : cellModels) {
             HBaseCell resultCell = new HBaseCell();
             String family = cellModel.getFamily();
@@ -171,6 +174,7 @@ public class HBaseProducer extends DefaultProducer 
implements ServicePoolAware {
             if (kvs != null && !kvs.isEmpty()) {
                 //Return the most recent entry.
                 
resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(cellModel.getValueType(),
 kvs.get(0).getValue()));
+                resultCell.setTimestamp(kvs.get(0).getTimestamp());
             }
             resultCells.add(resultCell);
             resultRow.getCells().add(resultCell);
@@ -180,9 +184,6 @@ public class HBaseProducer extends DefaultProducer 
implements ServicePoolAware {
 
     /**
      * Creates an HBase {@link Delete} on a specific row, using a collection 
of values (family/column/value pairs).
-     *
-     * @param hRow
-     * @throws Exception
      */
     private Delete createDeleteRow(HBaseRow hRow) throws Exception {
         ObjectHelper.notNull(hRow, "HBase row");
@@ -191,25 +192,37 @@ public class HBaseProducer extends DefaultProducer 
implements ServicePoolAware {
     }
 
     /**
-     * Perfoms an HBase {@link Get} on a specific row, using a collection of 
values (family/column/value pairs).
+     * Performs an HBase {@link Get} on a specific row, using a collection of 
values (family/column/value pairs).
      * The result is <p>the most recent entry</p> for each column.
-     *
-     * @param table
-     * @param model
-     * @return
-     * @throws Exception
      */
-    private List<HBaseRow> scanCells(HTableInterface table, HBaseRow model, 
List<Filter> filters) throws Exception {
+    private List<HBaseRow> scanCells(HTableInterface table, HBaseRow model, 
String start, Integer maxRowScan, List<Filter> filters) throws Exception {
         List<HBaseRow> rowSet = new LinkedList<HBaseRow>();
-        Scan scan = new Scan();
+
+        HBaseRow startRow = new HBaseRow(model.getCells());
+        startRow.setId(start);
+
+        Scan scan;
+        if (start != null) {
+            scan = new Scan(Bytes.toBytes(start));
+        } else {
+            scan = new Scan();
+        }
+
+        // need to clone the filters as they are not thread safe to use
         if (filters != null && !filters.isEmpty()) {
+            List<Filter> clonedFilters = new LinkedList<Filter>();
             for (Filter filter : filters) {
                 if 
(ModelAwareFilter.class.isAssignableFrom(filter.getClass())) {
-                    ((ModelAwareFilter<?>) 
filter).apply(endpoint.getCamelContext(), model);
+                    Object clone = 
endpoint.getCamelContext().getInjector().newInstance(filter.getClass());
+                    if (clone instanceof ModelAwareFilter) {
+                        ((ModelAwareFilter<?>) 
clone).apply(endpoint.getCamelContext(), model);
+                        clonedFilters.add((ModelAwareFilter<?>) clone);
+                    }
                 }
             }
-            scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, 
filters));
+            scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, 
clonedFilters));
         }
+
         Set<HBaseCell> cellModels = model.getCells();
         for (HBaseCell cellModel : cellModels) {
             String family = cellModel.getFamily();
@@ -221,10 +234,14 @@ public class HBaseProducer extends DefaultProducer 
implements ServicePoolAware {
         }
 
         ResultScanner resultScanner = table.getScanner(scan);
+        int count = 0;
         Result result = resultScanner.next();
-        while (result != null) {
+
+        while (result != null && count < maxRowScan) {
             HBaseRow resultRow = new HBaseRow();
             
resultRow.setId(endpoint.getCamelContext().getTypeConverter().convertTo(model.getRowType(),
 result.getRow()));
+
+            resultRow.setTimestamp(result.raw()[0].getTimestamp());
             cellModels = model.getCells();
             for (HBaseCell modelCell : cellModels) {
                 HBaseCell resultCell = new HBaseCell();
@@ -235,10 +252,14 @@ public class HBaseProducer extends DefaultProducer 
implements ServicePoolAware {
                         
result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), 
HBaseHelper.getHBaseFieldAsBytes(column))));
                 resultCell.setFamily(modelCell.getFamily());
                 resultCell.setQualifier(modelCell.getQualifier());
+
+                if 
(result.getColumnLatest(HBaseHelper.getHBaseFieldAsBytes(family), 
HBaseHelper.getHBaseFieldAsBytes(column)) != null) {
+                    
resultCell.setTimestamp(result.getColumnLatest(HBaseHelper.getHBaseFieldAsBytes(family),
 HBaseHelper.getHBaseFieldAsBytes(column)).getTimestamp());
+                }
                 resultRow.getCells().add(resultCell);
-                rowSet.add(resultRow);
             }
-
+            rowSet.add(resultRow);
+            count++;
             result = resultScanner.next();
         }
         return rowSet;

http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareRowPrefixMatchingFilter.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareRowPrefixMatchingFilter.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareRowPrefixMatchingFilter.java
new file mode 100644
index 0000000..84be756
--- /dev/null
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareRowPrefixMatchingFilter.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hbase.filters;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.hbase.model.HBaseRow;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+
+/**
+ * A {@link FilterList} that contains multiple {@link PrefixFilter}s one per 
column that is part of the model.
+ */
+public class ModelAwareRowPrefixMatchingFilter extends FilterList implements 
ModelAwareFilter<FilterList> {
+
+    /**
+     * Writable constructor, do not use.
+     */
+    public ModelAwareRowPrefixMatchingFilter() {
+    }
+
+    /**
+     * Applies the message to {@link org.apache.hadoop.hbase.filter.Filter} to
+     * context.
+     */
+    @Override
+    public void apply(CamelContext context, HBaseRow rowModel) {
+        getFilters().clear();
+        if (rowModel != null) {
+            if (rowModel.getId() != null) {
+                byte[] value = 
context.getTypeConverter().convertTo(byte[].class, rowModel.getId());
+                PrefixFilter rowPrefixFilter = new PrefixFilter(value);
+                addFilter(rowPrefixFilter);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java
index 7781de8..d8c54bd 100644
--- 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/mapping/HeaderMappingStrategy.java
@@ -106,6 +106,7 @@ public class HeaderMappingStrategy implements 
CellMappingStrategy {
      * Applies the cells to the {@link org.apache.camel.Exchange}.
      */
     public void applyGetResults(Message message, HBaseData data) {
+        message.setHeaders(message.getExchange().getIn().getHeaders());
         int index = 1;
         if (data == null || data.getRows() == null) {
             return;
@@ -126,6 +127,7 @@ public class HeaderMappingStrategy implements 
CellMappingStrategy {
      * Applies the cells to the {@link org.apache.camel.Exchange}.
      */
     public void applyScanResults(Message message, HBaseData data) {
+        message.setHeaders(message.getExchange().getIn().getHeaders());
         int index = 1;
         if (data == null || data.getRows() == null) {
             return;

http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java
index 54e5a1b..14f4104 100644
--- 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseCell.java
@@ -28,6 +28,7 @@ public class HBaseCell {
     private String family;
     private String qualifier;
     private Object value;
+    private Long timestamp;
     //The value type can be optionally specified for Gets and Scan, to specify 
how the byte[] read will be converted.
     private Class<?> valueType = String.class;
 
@@ -73,6 +74,14 @@ public class HBaseCell {
         this.valueType = valueType;
     }
 
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Long timestamp) {
+        this.timestamp = timestamp;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -82,12 +91,21 @@ public class HBaseCell {
             return false;
         }
 
-        HBaseCell cell = (HBaseCell) o;
+        HBaseCell hBaseCell = (HBaseCell) o;
 
-        if (family != null ? !family.equals(cell.family) : cell.family != 
null) {
+        if (family != null ? !family.equals(hBaseCell.family) : 
hBaseCell.family != null) {
+            return false;
+        }
+        if (qualifier != null ? !qualifier.equals(hBaseCell.qualifier) : 
hBaseCell.qualifier != null) {
+            return false;
+        }
+        if (timestamp != null ? !timestamp.equals(hBaseCell.timestamp) : 
hBaseCell.timestamp != null) {
+            return false;
+        }
+        if (value != null ? !value.equals(hBaseCell.value) : hBaseCell.value 
!= null) {
             return false;
         }
-        if (qualifier != null ? !qualifier.equals(cell.qualifier) : 
cell.qualifier != null) {
+        if (valueType != null ? !valueType.equals(hBaseCell.valueType) : 
hBaseCell.valueType != null) {
             return false;
         }
 
@@ -98,6 +116,9 @@ public class HBaseCell {
     public int hashCode() {
         int result = family != null ? family.hashCode() : 0;
         result = 31 * result + (qualifier != null ? qualifier.hashCode() : 0);
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
+        result = 31 * result + (valueType != null ? valueType.hashCode() : 0);
         return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java
index f267bcf..daf6e5f 100644
--- 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/model/HBaseRow.java
@@ -29,6 +29,8 @@ public class HBaseRow implements Cloneable {
     private Class<?> rowType = String.class;
     private Set<HBaseCell> cells;
 
+    private long timestamp;
+
     public HBaseRow() {
         this(new LinkedHashSet<HBaseCell>());
     }
@@ -70,6 +72,14 @@ public class HBaseRow implements Cloneable {
         return cells.size();
     }
 
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
     public void apply(HBaseRow modelRow) {
         if (modelRow != null) {
             if (rowType == null && modelRow.getRowType() != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/9edcfbc1/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
 
b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
index 2efd3b7..9e71c65 100644
--- 
a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
+++ 
b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
@@ -233,6 +233,28 @@ public class HBaseProducerTest extends 
CamelHBaseTestSupport {
     }
 
     @Test
+    public void testPutMultiRowsAndMaxScan() throws Exception {
+        testPutMultiRows();
+        if (systemReady) {
+            Exchange resp = template.request("direct:maxScan", new Processor() 
{
+                public void process(Exchange exchange) throws Exception {
+                    
exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
+                    
exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), 
column[0][0]);
+                }
+            });
+
+            Object result1 = 
resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(1));
+            Object result2 = 
resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(2));
+            // as we use maxResults=2 we only get 2 results back
+            Object result3 = 
resp.getOut().getHeader(HbaseAttribute.HBASE_VALUE.asHeader(3));
+            assertNull("Should only get 2 results back", result3);
+
+            List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0]);
+            assertTrue(bodies.contains(result1) && bodies.contains(result2));
+        }
+    }
+
+    @Test
     public void testPutMultiRowsAndScan() throws Exception {
         testPutMultiRows();
         if (systemReady) {
@@ -304,6 +326,9 @@ public class HBaseProducerTest extends 
CamelHBaseTestSupport {
                     .to("hbase://" + PERSON_TABLE + 
"?family=info&qualifier=firstName&family2=birthdate&qualifier2=year");
 
                 from("direct:scan")
+                        .to("hbase://" + PERSON_TABLE + "?operation=" + 
HBaseConstants.SCAN);
+
+                from("direct:maxScan")
                     .to("hbase://" + PERSON_TABLE + "?operation=" + 
HBaseConstants.SCAN + "&maxResults=2");
             }
         };

Reply via email to