Repository: camel Updated Branches: refs/heads/master 758c575ae -> e95b669e8
CAMEL-9523 Use setStopRow on org.apache.hadoop.hbase.client.Scan Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e95b669e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e95b669e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e95b669e Branch: refs/heads/master Commit: e95b669e8ac4734651ba8a632de17d5bf3ab9950 Parents: 758c575 Author: Andrea Cosentino <anco...@gmail.com> Authored: Wed Mar 9 15:30:17 2016 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Wed Mar 9 15:31:12 2016 +0100 ---------------------------------------------------------------------- .../camel/component/hbase/HBaseConstants.java | 2 ++ .../camel/component/hbase/HBaseProducer.java | 9 ++++++-- .../component/hbase/HBaseProducerTest.java | 24 +++++++++++++++++++- 3 files changed, 32 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e95b669e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java index 42ae4fc..3db652a 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConstants.java @@ -28,4 +28,6 @@ public interface HBaseConstants { String HBASE_MAX_SCAN_RESULTS = "CamelHBaseMaxScanResults"; String FROM_ROW = "CamelHBaseStartRow"; + + String STOP_ROW = "CamelHBaseStopRow"; } http://git-wip-us.apache.org/repos/asf/camel/blob/e95b669e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java index b9ccfce..b09c829 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java @@ -63,6 +63,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { Integer maxScanResult = exchange.getIn().getHeader(HBaseConstants.HBASE_MAX_SCAN_RESULTS, Integer.class); String fromRowId = (String) exchange.getIn().getHeader(HBaseConstants.FROM_ROW); + String stopRowId = (String) exchange.getIn().getHeader(HBaseConstants.STOP_ROW); CellMappingStrategy mappingStrategy = endpoint.getCellMappingStrategyFactory().getStrategy(exchange.getIn()); HBaseData data = mappingStrategy.resolveModel(exchange.getIn()); @@ -82,7 +83,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { } else if (HBaseConstants.DELETE.equals(operation)) { deleteOperations.add(createDeleteRow(hRow)); } else if (HBaseConstants.SCAN.equals(operation)) { - scanOperationResult = scanCells(table, hRow, fromRowId, maxScanResult, endpoint.getFilters()); + scanOperationResult = scanCells(table, hRow, fromRowId, stopRowId, maxScanResult, endpoint.getFilters()); } } @@ -189,7 +190,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { * Performs an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs). * The result is <p>the most recent entry</p> for each column. */ - private List<HBaseRow> scanCells(Table table, HBaseRow model, String start, Integer maxRowScan, List<Filter> filters) + private List<HBaseRow> scanCells(Table table, HBaseRow model, String start, String stop, Integer maxRowScan, List<Filter> filters) throws Exception { List<HBaseRow> rowSet = new LinkedList<>(); @@ -202,6 +203,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { } else { scan = new Scan(); } + + if (ObjectHelper.isNotEmpty(stop)) { + scan.setStopRow(Bytes.toBytes(stop)); + } if (filters != null && !filters.isEmpty()) { for (int i = 0; i < filters.size(); i++) { http://git-wip-us.apache.org/repos/asf/camel/blob/e95b669e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java index cac4539..ba96175 100644 --- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java +++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java @@ -275,11 +275,33 @@ public class HBaseProducerTest extends CamelHBaseTestSupport { Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1)); Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)); Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3)); - + System.err.println(resp.getOut().getHeaders().toString()); List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]); assertTrue(bodies.contains(result1) && bodies.contains(result2) && bodies.contains(result3)); } } + + @Test + public void testPutMultiRowsAndScanWithStop() throws Exception { + testPutMultiRows(); + if (systemReady) { + Exchange resp = template.request("direct:scan", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(HBaseAttribute.HBASE_FAMILY.asHeader(), family[0]); + exchange.getIn().setHeader(HBaseAttribute.HBASE_QUALIFIER.asHeader(), column[0][0]); + exchange.getIn().setHeader(HBaseConstants.FROM_ROW, key[0]); + exchange.getIn().setHeader(HBaseConstants.STOP_ROW, key[1]); + } + }); + + Object result1 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(1)); + Object result2 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(2)); + Object result3 = resp.getOut().getHeader(HBaseAttribute.HBASE_VALUE.asHeader(3)); + + List<?> bodies = Arrays.asList(body[0][0][0], body[1][0][0], body[2][0][0]); + assertTrue(bodies.contains(result1) && !bodies.contains(result2) && !bodies.contains(result3)); + } + } @Test public void testPutAndScan() throws Exception {