handling UGI within camel-hbase
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e0b3255c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e0b3255c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e0b3255c Branch: refs/heads/master Commit: e0b3255c233873dec0c14b593d7d82bb1efa4826 Parents: db823ea Author: woj-i <wojciechin...@gmail.com> Authored: Thu Oct 15 18:29:29 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Nov 24 19:07:55 2015 +0100 ---------------------------------------------------------------------- .../camel/component/hbase/HBaseConsumer.java | 12 +++---- .../camel/component/hbase/HBaseEndpoint.java | 37 ++++++++++++++++++-- .../camel/component/hbase/HBaseProducer.java | 11 ++---- 3 files changed, 41 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e0b3255c/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java index bf3760d..59aaa43 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java @@ -21,6 +21,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Set; + import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.hbase.mapping.CellMappingStrategy; @@ -33,7 +34,6 @@ import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -50,22 +50,18 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(HBaseConsumer.class); - private String tableName; private final HBaseEndpoint endpoint; - private HTablePool tablePool; private HBaseRow rowModel; - public HBaseConsumer(HBaseEndpoint endpoint, Processor processor, HTablePool tablePool, String tableName) { + public HBaseConsumer(HBaseEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; - this.tableName = tableName; - this.tablePool = tablePool; this.rowModel = endpoint.getRowModel(); } @Override protected int poll() throws Exception { - HTableInterface table = tablePool.getTable(tableName); + HTableInterface table = endpoint.getTable(); try { shutdownRunningTask = null; pendingExchanges = 0; @@ -192,7 +188,7 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer { * Delegates to the {@link HBaseRemoveHandler}. */ private void remove(byte[] row) throws IOException { - HTableInterface table = tablePool.getTable(tableName); + HTableInterface table = endpoint.getTable(); try { endpoint.getRemoveHandler().remove(table, row); } finally { http://git-wip-us.apache.org/repos/asf/camel/blob/e0b3255c/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java index cb08a02..109418c 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.hbase; +import java.security.PrivilegedAction; import java.util.List; import org.apache.camel.Consumer; @@ -30,8 +31,10 @@ import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.security.UserGroupInformation; /** * Represents an HBase endpoint. @@ -66,21 +69,30 @@ public class HBaseEndpoint extends DefaultEndpoint { @UriParam(label = "consumer") private int maxMessagesPerPoll; + /** + * in the purpose of performance optimization + */ + private byte[] tableNameBytes; + + private UserGroupInformation ugi = null; + public HBaseEndpoint(String uri, HBaseComponent component, HTablePool tablePool, String tableName) { super(uri, component); this.tableName = tableName; this.tablePool = tablePool; if (this.tableName == null) { throw new IllegalArgumentException("Table name can not be null"); - } + }else{ + tableNameBytes = tableName.getBytes(); + } } public Producer createProducer() throws Exception { - return new HBaseProducer(this, tablePool, tableName); + return new HBaseProducer(this); } public Consumer createConsumer(Processor processor) throws Exception { - HBaseConsumer consumer = new HBaseConsumer(this, processor, tablePool, tableName); + HBaseConsumer consumer = new HBaseConsumer(this, processor); configureConsumer(consumer); consumer.setMaxMessagesPerPoll(maxMessagesPerPoll); return consumer; @@ -217,4 +229,23 @@ public class HBaseEndpoint extends DefaultEndpoint { public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { this.maxMessagesPerPoll = maxMessagesPerPoll; } + + + /** + * Gets connection to the table (secured or not, depends on the object initialization) + * please remember to close the table after use + * @return table, remember to close! + */ + public HTableInterface getTable(){ + if (ugi!=null){ + return ugi.doAs(new PrivilegedAction<HTableInterface>() { + @Override + public HTableInterface run() { + return tablePool.getTable(tableNameBytes); + } + }); + }else{ + return tablePool.getTable(tableNameBytes); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/e0b3255c/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java index 4a921aa..215e502 100644 --- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java +++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -49,20 +48,16 @@ import org.apache.hadoop.hbase.util.Bytes; public class HBaseProducer extends DefaultProducer implements ServicePoolAware { private HBaseEndpoint endpoint; - private String tableName; - private final HTablePool tablePool; private HBaseRow rowModel; - public HBaseProducer(HBaseEndpoint endpoint, HTablePool tablePool, String tableName) { + public HBaseProducer(HBaseEndpoint endpoint) { super(endpoint); this.endpoint = endpoint; - this.tableName = tableName; - this.tablePool = tablePool; this.rowModel = endpoint.getRowModel(); } public void process(Exchange exchange) throws Exception { - HTableInterface table = tablePool.getTable(tableName.getBytes()); + HTableInterface table = endpoint.getTable(); try { updateHeaders(exchange); @@ -105,7 +100,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware { mappingStrategy.applyScanResults(exchange.getOut(), new HBaseData(scanOperationResult)); } } finally { - table.close(); + table.close(); } }