Author: iocanel
Date: Tue Jun 12 13:59:41 2012
New Revision: 1349362

URL: http://svn.apache.org/viewvc?rev=1349362&view=rev
Log:
[CAMEL-4743] Improved filter support for scan operations. Added filters that 
are camel context aware and also aware of the message content.

Added:
    
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/
    
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareColumnMatchingFilter.java
    
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilter.java
    
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilterList.java
    
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareSkipFilter.java
    
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareWhileMatchFilter.java
    
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
      - copied, changed from r1349256, 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
Modified:
    
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java
    
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
    
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
    
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
    
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
    camel/trunk/components/camel-hbase/src/test/resources/producer.xml

Modified: 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java?rev=1349362&r1=1349361&r2=1349362&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java
 (original)
+++ 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseComponent.java
 Tue Jun 12 13:59:41 2012
@@ -83,6 +83,7 @@ public class HBaseComponent extends Defa
             HBaseCell cellModel = new HBaseCell();
             
cellModel.setFamily(String.valueOf(parameters.remove(HbaseAttribute.HBASE_FAMILY.asOption(i))));
             
cellModel.setQualifier(String.valueOf(parameters.remove(HbaseAttribute.HBASE_QUALIFIER.asOption(i))));
+            
cellModel.setValue(String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE.asOption(i))));
             if 
(parameters.containsKey(HbaseAttribute.HBASE_VALUE_TYPE.asOption())) {
                 String valueType = 
String.valueOf(parameters.remove(HbaseAttribute.HBASE_VALUE_TYPE.asOption()));
                 if (valueType != null && !valueType.isEmpty()) {

Modified: 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java?rev=1349362&r1=1349361&r2=1349362&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
 (original)
+++ 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
 Tue Jun 12 13:59:41 2012
@@ -20,7 +20,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.ServicePoolAware;
+import org.apache.camel.component.hbase.filters.ModelAwareFilter;
 import org.apache.camel.component.hbase.mapping.CellMappingStrategy;
 import org.apache.camel.component.hbase.mapping.CellMappingStrategyFactory;
 import org.apache.camel.component.hbase.model.HBaseCell;
@@ -206,6 +208,11 @@ public class HBaseProducer extends Defau
         List<HBaseRow> rowSet = new LinkedList<HBaseRow>();
         Scan scan = new Scan();
         if (filters != null && !filters.isEmpty()) {
+            for (Filter filter : filters) {
+                if 
(ModelAwareFilter.class.isAssignableFrom(filter.getClass())) {
+                    ((ModelAwareFilter) 
filter).apply(endpoint.getCamelContext(), model);
+                }
+            }
             scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, 
filters));
         }
         Set<HBaseCell> cellModels = model.getCells();
@@ -231,6 +238,8 @@ public class HBaseProducer extends Defau
                 
resultRow.setId(endpoint.getCamelContext().getTypeConverter().convertTo(model.getRowType(),
 result.getRow()));
                 
resultCell.setValue(endpoint.getCamelContext().getTypeConverter().convertTo(modelCell.getValueType(),
                         
result.getValue(HBaseHelper.getHBaseFieldAsBytes(family), 
HBaseHelper.getHBaseFieldAsBytes(column))));
+                resultCell.setFamily(modelCell.getFamily());
+                resultCell.setQualifier(modelCell.getQualifier());
                 resultRow.getCells().add(resultCell);
                 rowSet.add(resultRow);
             }

Added: 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareColumnMatchingFilter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareColumnMatchingFilter.java?rev=1349362&view=auto
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareColumnMatchingFilter.java
 (added)
+++ 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareColumnMatchingFilter.java
 Tue Jun 12 13:59:41 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hbase.filters;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.hbase.HBaseHelper;
+import org.apache.camel.component.hbase.model.HBaseCell;
+import org.apache.camel.component.hbase.model.HBaseRow;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+/**
+ * A {@link FilterList} that contains multiple {@link 
SingleColumnValueExcludeFilter}s one per column that is part of the model.
+ */
+public class ModelAwareColumnMatchingFilter extends FilterList implements 
ModelAwareFilter<FilterList> {
+
+    /**
+     * Writable constructor, do not use.
+     */
+    public ModelAwareColumnMatchingFilter() {
+    }
+
+
+
+    /**
+     * Applies the message to {@link org.apache.hadoop.hbase.filter.Filter} to 
context.
+     *
+     * @param context
+     * @param rowModel
+     */
+    @Override
+    public void apply(CamelContext context, HBaseRow rowModel) {
+        getFilters().clear();
+        if (rowModel != null) {
+            for (HBaseCell cell : rowModel.getCells()) {
+                if (cell.getValue() != null) {
+                    byte[] family = 
HBaseHelper.getHBaseFieldAsBytes(cell.getFamily());
+                    byte[] qualifier = 
HBaseHelper.getHBaseFieldAsBytes(cell.getQualifier());
+                    byte[] value = 
context.getTypeConverter().convertTo(byte[].class, cell.getValue());
+                    SingleColumnValueFilter columnValueFilter = new 
SingleColumnValueFilter(family, qualifier, CompareFilter.CompareOp.EQUAL, 
value);
+                    addFilter(columnValueFilter);
+                }
+            }
+        }
+    }
+}

Added: 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilter.java?rev=1349362&view=auto
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilter.java
 (added)
+++ 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilter.java
 Tue Jun 12 13:59:41 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hbase.filters;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.hbase.model.HBaseRow;
+import org.apache.hadoop.hbase.filter.Filter;
+
+public interface ModelAwareFilter<T extends Filter> extends Filter {
+
+    /**
+     * Applies the message to {@link Filter} to context.
+     * @param context
+     * @param rowModel
+     */
+    void apply(CamelContext context, HBaseRow rowModel);
+}

Added: 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilterList.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilterList.java?rev=1349362&view=auto
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilterList.java
 (added)
+++ 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareFilterList.java
 Tue Jun 12 13:59:41 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hbase.filters;
+
+import java.util.List;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.hbase.model.HBaseRow;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+
+public class ModelAwareFilterList extends FilterList implements 
ModelAwareFilter<FilterList> {
+
+    /**
+     * Default constructor, filters nothing. Required though for RPC
+     * deserialization.
+     */
+    public ModelAwareFilterList() {
+    }
+
+    /**
+     * Constructor that takes a set of {@link 
org.apache.hadoop.hbase.filter.Filter}s. The default operator
+     * MUST_PASS_ALL is assumed.
+     *
+     * @param rowFilters list of filters
+     */
+    public ModelAwareFilterList(List<Filter> rowFilters) {
+        super(rowFilters);
+    }
+
+    /**
+     * Constructor that takes an operator.
+     *
+     * @param operator Operator to process filter set with.
+     */
+    public ModelAwareFilterList(Operator operator) {
+        super(operator);
+    }
+
+    /**
+     * Constructor that takes a set of {@link 
org.apache.hadoop.hbase.filter.Filter}s and an operator.
+     *
+     * @param operator   Operator to process filter set with.
+     * @param rowFilters Set of row filters.
+     */
+    public ModelAwareFilterList(Operator operator, List<Filter> rowFilters) {
+        super(operator, rowFilters);
+    }
+
+    /**
+     * Applies the message to {@link org.apache.hadoop.hbase.filter.Filter} to 
context.
+     *
+     * @param context
+     * @param rowModel
+     */
+    @Override
+    public void apply(CamelContext context, HBaseRow rowModel) {
+        for (Filter filter : getFilters()) {
+            if (ModelAwareFilter.class.isAssignableFrom(filter.getClass())) {
+                ((ModelAwareFilter) filter).apply(context, rowModel);
+            }
+        }
+    }
+
+    /**
+     * Wraps an existing {@link FilterList} filter into a {@link 
ModelAwareFilterList}.
+     * @param filter
+     * @return
+     */
+    public ModelAwareFilterList wrap(FilterList filter) {
+        return new ModelAwareFilterList(filter.getOperator(), 
filter.getFilters());
+    }
+}

Added: 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareSkipFilter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareSkipFilter.java?rev=1349362&view=auto
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareSkipFilter.java
 (added)
+++ 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareSkipFilter.java
 Tue Jun 12 13:59:41 2012
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hbase.filters;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.hbase.model.HBaseRow;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.SkipFilter;
+
+public class ModelAwareSkipFilter extends SkipFilter implements 
ModelAwareFilter<SkipFilter> {
+
+
+    public ModelAwareSkipFilter() {
+    }
+
+    public ModelAwareSkipFilter(Filter filter) {
+        super(filter);
+    }
+
+    /**
+     * Applies the message to {@link org.apache.hadoop.hbase.filter.Filter} to 
context.
+     *
+     * @param context
+     * @param rowModel
+     */
+    @Override
+    public void apply(CamelContext context, HBaseRow rowModel) {
+        if (ModelAwareFilter.class.isAssignableFrom(getFilter().getClass())) {
+            ((ModelAwareFilter) getFilter()).apply(context, rowModel);
+        }
+    }
+
+
+    /**
+     * Wraps an existing {@link SkipFilter} filter into a {@link 
ModelAwareSkipFilter}.
+     *
+     * @param filter
+     * @return
+     */
+    public ModelAwareSkipFilter wrap(SkipFilter filter) {
+        return new ModelAwareSkipFilter(filter.getFilter());
+    }
+}

Added: 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareWhileMatchFilter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareWhileMatchFilter.java?rev=1349362&view=auto
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareWhileMatchFilter.java
 (added)
+++ 
camel/trunk/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/filters/ModelAwareWhileMatchFilter.java
 Tue Jun 12 13:59:41 2012
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hbase.filters;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.hbase.model.HBaseRow;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.SkipFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+
+public class ModelAwareWhileMatchFilter extends WhileMatchFilter implements 
ModelAwareFilter<WhileMatchFilter> {
+
+    public ModelAwareWhileMatchFilter() {
+    }
+
+    public ModelAwareWhileMatchFilter(Filter filter) {
+        super(filter);
+    }
+
+    /**
+     * Applies the message to {@link org.apache.hadoop.hbase.filter.Filter} to 
context.
+     *
+     * @param context
+     * @param rowModel
+     */
+    @Override
+    public void apply(CamelContext context, HBaseRow rowModel) {
+        if (ModelAwareFilter.class.isAssignableFrom(getFilter().getClass())) {
+            ((ModelAwareFilter) getFilter()).apply(context, rowModel);
+        }
+    }
+
+    /**
+     * Wraps an existing {@link WhileMatchFilter} filter into a {@link 
ModelAwareWhileMatchFilter}.
+     *
+     * @param filter
+     * @return
+     */
+    public ModelAwareWhileMatchFilter wrap(WhileMatchFilter filter) {
+        return new ModelAwareWhileMatchFilter(filter.getFilter());
+    }
+
+}

Copied: 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
 (from r1349256, 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java?p2=camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java&p1=camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java&r1=1349256&r2=1349362&rev=1349362&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
 (original)
+++ 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
 Tue Jun 12 13:59:41 2012
@@ -14,29 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.hbase;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.hbase.filters.ModelAwareColumnMatchingFilter;
+import org.apache.camel.impl.JndiRegistry;
 import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class HBaseConsumerTest extends CamelHBaseTestSupport {
+public class CamelHBaseFilterTest extends CamelHBaseTestSupport {
 
-    protected Object[] key = {"1", "2", "3"};
-    protected final String[] body = {"Hello Hbase", "Hi HBase", "Yo HBase"};
-    protected final String[] family = {"family1", "family2", "family3"};
-    protected final String[] column = {"mycolumn1", "mycolumn2", "mycolumn3"};
-    protected final byte[][] families = {DEFAULTFAMILY.getBytes(),
-            family[0].getBytes(),
-            family[1].getBytes(),
-            family[2].getBytes()};
+    List<Filter> filters = new LinkedList<Filter>();
 
     @Before
     public void setUp() throws Exception {
@@ -54,39 +52,34 @@ public class HBaseConsumerTest extends C
     @After
     public void tearDown() throws Exception {
         if (systemReady) {
+            hbaseUtil.deleteTable(DEFAULTTABLE.getBytes());
             super.tearDown();
         }
     }
 
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        filters.add(new ModelAwareColumnMatchingFilter());
+        jndi.bind("myFilters", filters);
+        return jndi;
+    }
 
     @Test
-    public void testPutMultiRowsAndConsume() throws Exception {
+    public void testPutMultiRowsAndScanWithFilters() throws Exception {
         if (systemReady) {
+            putMultipleRows();
             ProducerTemplate template = context.createProducerTemplate();
-            Map<String, Object> headers = new HashMap<String, Object>();
-            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
-            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
-            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(), column[0]);
-            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(), body[0]);
-
-            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(2), key[1]);
-            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(2), family[0]);
-            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(2), column[0]);
-            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(2), body[1]);
-
-            headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(3), key[2]);
-            headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(3), family[0]);
-            headers.put(HbaseAttribute.HBASE_QUALIFIER.asHeader(3), column[0]);
-            headers.put(HbaseAttribute.HBASE_VALUE.asHeader(3), body[2]);
-
-            headers.put(HBaseContats.OPERATION, HBaseContats.PUT);
-
-            template.sendBodyAndHeaders("direct:start", null, headers);
-
-            MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
-            mockEndpoint.expectedMessageCount(3);
-            mockEndpoint.assertIsSatisfied(10000);
-            Thread.sleep(10000);
+            Endpoint endpoint = context.getEndpoint("direct:scan");
+
+            Exchange exchange = endpoint.createExchange(ExchangePattern.InOut);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_FAMILY.asHeader(), 
family[0]);
+            
exchange.getIn().setHeader(HbaseAttribute.HBASE_QUALIFIER.asHeader(), 
column[0]);
+            exchange.getIn().setHeader(HbaseAttribute.HBASE_VALUE.asHeader(), 
body[0]);
+            Exchange resp = template.send(endpoint, exchange);
+            Message out = resp.getOut();
+            assertTrue(out.getHeaders().containsValue(body[0]) && 
!out.getHeaders().containsValue(body[1]) && 
!out.getHeaders().containsValue(body[2]));
+
         }
     }
 
@@ -101,10 +94,10 @@ public class HBaseConsumerTest extends C
             public void configure() {
                 from("direct:start")
                         .to("hbase://" + DEFAULTTABLE);
-
-                from("hbase://" + DEFAULTTABLE)
-                        .to("mock:result");
+                from("direct:scan")
+                        .to("hbase://" + DEFAULTTABLE + "?operation=" + 
HBaseContats.SCAN + "&maxResults=2&filters=#myFilters");
             }
         };
     }
+
 }

Modified: 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java?rev=1349362&r1=1349361&r2=1349362&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
 (original)
+++ 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
 Tue Jun 12 13:59:41 2012
@@ -17,24 +17,37 @@
 
 package org.apache.camel.component.hbase;
 
+import java.io.IOException;
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 public abstract class CamelHBaseTestSupport extends CamelTestSupport {
 
+    //The hbase testing utility has special requirements on the umask.
+    //We hold this value to check if the the minicluster has properly started 
and tests can be run.
+    protected static Boolean systemReady = true;
+
     protected static HBaseTestingUtility hbaseUtil = new HBaseTestingUtility();
     protected static int numServers = 1;
     protected static final String DEFAULTTABLE = "DEFAULTTABLE";
     protected static final String DEFAULTFAMILY = "DEFAULTFAMILY";
 
-    //The hbase testing utility has special requirements on the umask.
-    //We hold this value to check if the the minicluster has properly started 
and tests can be run.
-    protected static Boolean systemReady = true;
-
+    protected String[] key = {"1", "2", "3"};
+    protected final String[] body = {"Hello Hbase", "Hi HBase", "Yo HBase"};
+    protected final String[] family = {"family1", "family2", "family3"};
+    protected final String[] column = {"mycolumn1", "mycolumn2", "mycolumn3"};
+    protected final byte[][] families = {DEFAULTFAMILY.getBytes(),
+            family[0].getBytes(),
+            family[1].getBytes(),
+            family[2].getBytes()};
 
     @BeforeClass
     public static void setUpClass() throws Exception {
@@ -60,4 +73,15 @@ public abstract class CamelHBaseTestSupp
         context.addComponent("hbase", component);
         return context;
     }
+
+
+    protected void putMultipleRows() throws IOException {
+        Configuration configuration = 
hbaseUtil.getHBaseAdmin().getConfiguration();
+        HTable table = new HTable(configuration, DEFAULTTABLE.getBytes());
+        for (int r = 0; r < key.length; r++) {
+            Put put = new Put(key[r].getBytes());
+            put.add(family[0].getBytes(), column[0].getBytes(), 
body[r].getBytes());
+            table.put(put);
+        }
+    }
 }

Modified: 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java?rev=1349362&r1=1349361&r2=1349362&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
 (original)
+++ 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
 Tue Jun 12 13:59:41 2012
@@ -29,15 +29,6 @@ import org.junit.Test;
 
 public class HBaseConsumerTest extends CamelHBaseTestSupport {
 
-    protected Object[] key = {"1", "2", "3"};
-    protected final String[] body = {"Hello Hbase", "Hi HBase", "Yo HBase"};
-    protected final String[] family = {"family1", "family2", "family3"};
-    protected final String[] column = {"mycolumn1", "mycolumn2", "mycolumn3"};
-    protected final byte[][] families = {DEFAULTFAMILY.getBytes(),
-            family[0].getBytes(),
-            family[1].getBytes(),
-            family[2].getBytes()};
-
     @Before
     public void setUp() throws Exception {
         if (systemReady) {

Modified: 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java?rev=1349362&r1=1349361&r2=1349362&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
 (original)
+++ 
camel/trunk/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
 Tue Jun 12 13:59:41 2012
@@ -37,16 +37,6 @@ import org.junit.Test;
 
 public class HBaseProducerTest extends CamelHBaseTestSupport {
 
-    protected String[] key = {"1", "2", "3"};
-    protected final String[] body = {"Hello Hbase", "Hi HBase", "Yo HBase"};
-    protected final String[] family = {"family1", "family2", "family3"};
-    protected final String[] column = {"mycolumn1", "mycolumn2", "mycolumn3"};
-    protected final byte[][] families = {DEFAULTFAMILY.getBytes(),
-            family[0].getBytes(),
-            family[1].getBytes(),
-            family[2].getBytes()};
-
-
     @Before
     public void setUp() throws Exception {
         if (systemReady) {
@@ -292,7 +282,6 @@ public class HBaseProducerTest extends C
         }
     }
 
-
     /**
      * Factory method which derived classes can use to create a {@link 
org.apache.camel.builder.RouteBuilder}
      * to define the routes for testing
@@ -306,7 +295,7 @@ public class HBaseProducerTest extends C
                         .to("hbase://" + DEFAULTTABLE);
 
                 from("direct:scan")
-                        .to("hbase://" + DEFAULTTABLE + "?operation=" + 
HBaseContats.SCAN + "&maxResults=2&family=family1&qualifier=column1");
+                        .to("hbase://" + DEFAULTTABLE + "?operation=" + 
HBaseContats.SCAN + "&maxResults=2");
             }
         };
     }

Modified: camel/trunk/components/camel-hbase/src/test/resources/producer.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hbase/src/test/resources/producer.xml?rev=1349362&r1=1349361&r2=1349362&view=diff
==============================================================================
--- camel/trunk/components/camel-hbase/src/test/resources/producer.xml 
(original)
+++ camel/trunk/components/camel-hbase/src/test/resources/producer.xml Tue Jun 
12 13:59:41 2012
@@ -29,7 +29,7 @@
         </route>
         <route>
             <from uri="direct:scan"/>
-            <to 
uri="hbase://DEFAULTTABLE?operation=CamelHBaseScan&amp;maxResults=2&amp;family=family1&amp;qualifier=column1"/>
+            <to 
uri="hbase://DEFAULTTABLE?operation=CamelHBaseScan&amp;maxResults=2"/>
         </route>
     </camelContext>
 
@@ -40,4 +40,5 @@
     <bean id="config" factory-method="getConfiguration" 
factory-bean="utility"/>
 
     <bean id="utility" class="org.apache.hadoop.hbase.HBaseTestingUtility"/>
+
 </beans>
\ No newline at end of file


Reply via email to