This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch ranger-2.6
in repository https://gitbox.apache.org/repos/asf/ranger.git

commit 4d20197adf71749031a6f0c3d447058364cec525
Author: Fateh Singh <[email protected]>
AuthorDate: Wed Dec 4 18:09:21 2024 -0800

    RANGER-4670: (hbase plugin) Config to support disabling column 
authorization for fully authorized column families (#417)
    
    (cherry picked from commit cf736a5493eba485adfcba74cd0664efbdd321a0)
---
 .../hadoop/constants/RangerHadoopConstants.java    |   1 +
 .../authorization/hbase/AuthorizationSession.java  |  93 +--
 .../hbase/RangerAuthorizationCoprocessor.java      | 153 +++-
 .../hbase/RangerAuthorizationFilter.java           |   7 +-
 .../hbase/AuthorizationSessionTest.java            |   5 +-
 .../hbase/HBaseRangerAuthorizationTest.java        | 773 ++++++++++++++++-----
 .../hbase/RangerAuthorizationFilterTest.java       |   3 +-
 hbase-agent/src/test/resources/hbase-policies.json |  97 ++-
 8 files changed, 863 insertions(+), 269 deletions(-)

diff --git 
a/agents-common/src/main/java/org/apache/ranger/authorization/hadoop/constants/RangerHadoopConstants.java
 
b/agents-common/src/main/java/org/apache/ranger/authorization/hadoop/constants/RangerHadoopConstants.java
index fcd9ebd4d..86827e655 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/authorization/hadoop/constants/RangerHadoopConstants.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/authorization/hadoop/constants/RangerHadoopConstants.java
@@ -51,6 +51,7 @@ public class RangerHadoopConstants {
 
        public static final String  
HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_PROP        = 
"xasecure.hbase.update.xapolicies.on.grant.revoke";
        public static final boolean 
HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE = true;
+       public static final String HBASE_COLUMN_AUTH_OPTIMIZATION = 
"ranger.plugin.hbase.column.auth.optimized";
        
        public static final String KNOX_ACCESS_VERIFIER_CLASS_NAME_PROP         
= "knox.authorization.verifier.classname";
        public static final String 
KNOX_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = 
"org.apache.ranger.pdp.knox.RangerAuthorizer";
diff --git 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/AuthorizationSession.java
 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/AuthorizationSession.java
index 6e999ba51..df68f571c 100644
--- 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/AuthorizationSession.java
+++ 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/AuthorizationSession.java
@@ -30,11 +30,7 @@ import 
org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
 import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
-import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResult;
-import org.apache.ranger.plugin.service.RangerBasePlugin;
+import org.apache.ranger.plugin.policyengine.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +43,7 @@ public class AuthorizationSession {
        final HbaseUserUtils _userUtils = _factory.getUserUtils();
        final HbaseAuthUtils _authUtils = _factory.getAuthUtils();
        // immutable state
-       final RangerBasePlugin _authorizer;
+       final RangerHBasePlugin _authorizer;
        // Mutable state: Use supplied state information
        String _operation;
        String _otherInformation;
@@ -69,8 +65,8 @@ public class AuthorizationSession {
        // internal state per-authorization
        RangerAccessRequest _request;
        RangerAccessResult _result;
-       
-       public AuthorizationSession(RangerBasePlugin authorizer) {
+
+       public AuthorizationSession(RangerHBasePlugin authorizer) {
                _authorizer = authorizer;
        }
 
@@ -83,12 +79,12 @@ public class AuthorizationSession {
                _otherInformation = information;
                return this;
        }
-       
+
        AuthorizationSession remoteAddress(String ipAddress) {
                _remoteAddress = ipAddress;
                return this;
        }
-       
+
        AuthorizationSession access(String anAccess) {
                _access = anAccess;
                return this;
@@ -127,7 +123,7 @@ public class AuthorizationSession {
        }
 
        void verifyBuildable() {
-               
+
                String template = "Internal error: Incomplete/inconsisten 
state: [%s]. Can't build auth request!";
                if (_factory == null) {
                        String message = String.format(template, "factory is 
null");
@@ -174,11 +170,7 @@ public class AuthorizationSession {
                                StringUtils.equals(_operation, 
"getUserPermissionForNamespace");
        }
 
-       AuthorizationSession buildRequest() {
-
-               verifyBuildable();
-               // session can be reused so reset its state
-               zapAuthorizationState();
+       private RangerAccessResource createHBaseResource() {
                // TODO get this via a factory instead
                RangerAccessResourceImpl resource = new RangerHBaseResource();
                // policy engine should deal sensibly with null/empty values, 
if any
@@ -189,7 +181,11 @@ public class AuthorizationSession {
                }
                resource.setValue(RangerHBaseResource.KEY_COLUMN_FAMILY, 
_columnFamily);
                resource.setValue(RangerHBaseResource.KEY_COLUMN, _column);
-               
+               return resource;
+       }
+
+       private RangerAccessRequest createRangerRequest() {
+               RangerAccessResource resource = createHBaseResource();
                String user = _userUtils.getUserAsString(_user);
                RangerAccessRequestImpl request = new 
RangerAccessRequestImpl(resource, _access, user, _groups, null);
                request.setAction(_operation);
@@ -198,18 +194,25 @@ public class AuthorizationSession {
                request.setResourceMatchingScope(_resourceMatchingScope);
                request.setAccessTime(new Date());
                request.setIgnoreDescendantDeny(_ignoreDescendantDeny);
-               _request = request;
+               return request;
+       }
+
+       AuthorizationSession buildRequest() {
+               verifyBuildable();
+               // session can be reused so reset its state
+               zapAuthorizationState();
+               _request = createRangerRequest();
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Built request: " + request.toString());
+                       LOG.debug("Built request: " + _request.toString());
                }
                return this;
        }
-       
+
        AuthorizationSession authorize() {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> AuthorizationSession.authorize: " + 
getRequestMessage());
                }
-               
+
                if (_request == null) {
                        String message = String.format("Invalid state 
transition: buildRequest() must be called before authorize().  This request 
would ultimately get denied.!");
                        throw new IllegalStateException(message);
@@ -223,7 +226,7 @@ public class AuthorizationSession {
                        }
                        _result = _authorizer.isAccessAllowed(_request, 
_auditHandler);
                }
-               
+
                if (LOG.isDebugEnabled()) {
                        boolean allowed = isAuthorized();
                        String reason = getDenialReason();
@@ -231,19 +234,19 @@ public class AuthorizationSession {
                }
                return this;
        }
-       
+
        void logCapturedEvents() {
                if (_auditHandler != null) {
                        List<AuthzAuditEvent> events = 
_auditHandler.getCapturedEvents();
                        _auditHandler.logAuthzAudits(events);
                }
        }
-       
+
        void publishResults() throws AccessDeniedException {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> AuthorizationSession.publishResults()");
                }
-               
+
                boolean authorized = isAuthorized();
                if (_auditHandler != null && isAudited()) {
                        List<AuthzAuditEvent> events = null;
@@ -284,7 +287,7 @@ public class AuthorizationSession {
                        LOG.debug("<== AuthorizationSession.publishResults()");
                }
        }
-       
+
        boolean isAudited() {
 
                boolean audited = false;
@@ -313,7 +316,7 @@ public class AuthorizationSession {
                }
                return allowed;
        }
-       
+
        String getDenialReason() {
                String reason = "";
                if (_result == null) {
@@ -327,20 +330,21 @@ public class AuthorizationSession {
                }
                return reason;
        }
-       
+
        String requestToString() {
                return MoreObjects.toStringHelper(_request.getClass())
-                       .add("operation", _operation)
-                       .add("otherInformation", _otherInformation)
-                       .add("access", _access)
-                       .add("user", _user == null ? null : _user.getName())
-                       .add("groups", _groups)
-                       .add("auditHandler", _auditHandler == null ? null : 
_auditHandler.getClass().getSimpleName())
-                       .add(RangerHBaseResource.KEY_TABLE, _table)
-                       .add(RangerHBaseResource.KEY_COLUMN, _column)
-                       .add(RangerHBaseResource.KEY_COLUMN_FAMILY, 
_columnFamily)
-                       .add("resource-matching-scope", _resourceMatchingScope)
-                       .toString();
+                               .add("operation", _operation)
+                               .add("otherInformation", _otherInformation)
+                               .add("access", _access)
+                               .add("user", _user == null ? null : 
_user.getName())
+                               .add("groups", _groups)
+                               .add("auditHandler", _auditHandler == null ? 
null : _auditHandler.getClass().getSimpleName())
+                               .add(RangerHBaseResource.KEY_TABLE, _table)
+                               .add(RangerHBaseResource.KEY_COLUMN, _column)
+                               .add(RangerHBaseResource.KEY_COLUMN_FAMILY, 
_columnFamily)
+                               .add("resource-matching-scope", 
_resourceMatchingScope)
+                               .add("ignoreDescendantDeny", 
_ignoreDescendantDeny)
+                               .toString();
        }
 
        String getPrintableValue(String value) {
@@ -350,7 +354,7 @@ public class AuthorizationSession {
                        return "";
                }
        }
-       
+
        String getRequestMessage() {
                String format = "Access[%s] by user[%s] belonging to groups[%s] 
to table[%s] for column-family[%s], column[%s] triggered by operation[%s], 
otherInformation[%s]";
                String user = _userUtils.getUserAsString();
@@ -358,7 +362,7 @@ public class AuthorizationSession {
                                getPrintableValue(_columnFamily), 
getPrintableValue(_column), getPrintableValue(_operation), 
getPrintableValue(_otherInformation));
                return message;
        }
-       
+
        String getLogMessage(boolean allowed, String reason) {
                String format = " %s: status[%s], reason[%s]";
                String message = String.format(format, getRequestMessage(), 
allowed ? "allowed" : "denied", reason);
@@ -379,8 +383,13 @@ public class AuthorizationSession {
                _resourceMatchingScope = scope;
                return this;
        }
+
        AuthorizationSession ignoreDescendantDeny(boolean ignoreDescendantDeny) 
{
                _ignoreDescendantDeny = ignoreDescendantDeny;
                return this;
        }
-}
+
+       public boolean getPropertyIsColumnAuthOptimizationEnabled() {
+               return _authorizer.getPropertyIsColumnAuthOptimizationEnabled();
+       }
+}
\ No newline at end of file
diff --git 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
index 2c9b6b80b..213e15d49 100644
--- 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
+++ 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
@@ -76,6 +76,7 @@ import 
org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
 import org.apache.ranger.plugin.policyevaluator.RangerPolicyEvaluator;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
 import org.apache.ranger.plugin.util.GrantRevokeRequest;
+import org.apache.ranger.plugin.util.ServicePolicies;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,11 +91,11 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
        private static final String GROUP_PREFIX = "@";
 
        private UserProvider userProvider;
-    private RegionCoprocessorEnvironment regionEnv;
+       private RegionCoprocessorEnvironment regionEnv;
        private Map<InternalScanner, String> scannerOwners = new 
MapMaker().weakKeys().makeMap();
        /** if we should check EXEC permissions */
        private boolean shouldCheckExecPermission;
-       
+
        /*
         * These are package level only for testability and aren't meant to be 
exposed outside via getters/setters or made available to derived classes.
         */
@@ -102,7 +103,17 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
        final HbaseUserUtils _userUtils = _factory.getUserUtils();
        final HbaseAuthUtils _authUtils = _factory.getAuthUtils();
        private static volatile RangerHBasePlugin hbasePlugin = null;
-       
+
+       public void setColumnAuthOptimizationEnabled(boolean enable) throws 
Exception {
+               RangerHBasePlugin plugin = hbasePlugin;
+               if (plugin!=null) {
+                       plugin.setColumnAuthOptimizationEnabled(enable);
+               }
+               else {
+                       throw new Exception("Error while enabling column auth 
optimization");
+               }
+       }
+
        // Utilities Methods
        protected byte[] getTableName(RegionCoprocessorEnvironment e) {
                Region region = e.getRegion();
@@ -139,7 +150,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                                return true;
                        }
                }
-                       
+
                return false;
        }
        protected boolean isAccessForMetaTables(RegionCoprocessorEnvironment 
env) {
@@ -205,7 +216,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                return user;
        }
 
-       
+
        private String getRemoteAddress() {
                InetAddress remoteAddr = null;
                try {
@@ -237,7 +248,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
      String owner = scannerOwners.get(s);
      if (owner != null && !owner.equals(requestUserName)) {
        throw new AccessDeniedException("User '"+ requestUserName +"' is not 
the scanner owner!");
-     } 
+     }
        }
        /**
         * @param families
@@ -258,8 +269,16 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                                Collection<?> columnCollection = 
anEntry.getValue();
                                if (CollectionUtils.isEmpty(columnCollection)) {
                                        // family points to null map, OK.
+                                       // if column auth disabled, then also 
empty set is fine
+                                       if (LOG.isDebugEnabled()) {
+                                               
LOG.debug("RangerAuthorizationCoprocessor getColumnFamilies: columns are empty. 
" +
+                                                               "Setting 
columns to emptySet in familyMap");
+                                       }
                                        result.put(family, Collections.<String> 
emptySet());
                                } else {
+                                       if (LOG.isDebugEnabled()) {
+                                               
LOG.debug("RangerAuthorizationCoprocessor getColumnFamilies: columns exist");
+                                       }
                                        Iterator<String> columnIterator = new 
ColumnIterator(columnCollection);
                                        Set<String> columns = new 
HashSet<String>();
                                        try {
@@ -278,7 +297,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                }
                return result;
        }
-       
+
        static class ColumnFamilyAccessResult {
                final boolean _everythingIsAccessible;
                final boolean _somethingIsAccessible;
@@ -301,7 +320,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                        // cached values of access results
                        _filter = filter;
                }
-               
+
                @Override
                public String toString() {
                        return MoreObjects.toStringHelper(getClass())
@@ -313,12 +332,15 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                                        .add("denialReason", _denialReason)
                                        .add("filter", _filter)
                                        .toString();
-                       
+
                }
        }
-       
+
        ColumnFamilyAccessResult evaluateAccess(ObserverContext<?> ctx, String 
operation, Action action, final RegionCoprocessorEnvironment env,
-                                                                               
        final Map<byte[], ? extends Collection<?>> familyMap, String 
commandStr) throws AccessDeniedException {
+                       final Map<byte[], ? extends Collection<?>> familyMap, 
String commandStr) throws AccessDeniedException {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("evaluateAccess: 
isColumnAuthOptimizationEnabled="+hbasePlugin.getPropertyIsColumnAuthOptimizationEnabled());
+               }
 
                String access = _authUtils.getAccess(action);
                User user = getActiveUser(ctx);
@@ -393,7 +415,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                } else {
                        LOG.debug("evaluateAccess: Families collection not 
null.  Skipping table-level check, will do finer level check");
                }
-               
+
                boolean everythingIsAccessible = true;
                boolean somethingIsAccessible = false;
                /*
@@ -409,6 +431,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                Set<String> familesAccessAllowed = new HashSet<String>();
                Set<String> familesAccessDenied = new HashSet<String>();
                Set<String> familesAccessIndeterminate = new HashSet<String>();
+               Set<String> familiesFullyAuthorized = new HashSet<>();
 
                for (Map.Entry<byte[], ? extends Collection<?>> anEntry : 
familyMap.entrySet()) {
                        String family = Bytes.toString(anEntry.getKey());
@@ -440,6 +463,8 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("evaluateAccess: family level 
access for [" + family + "] is evaluated to " + isColumnFamilyAuthorized + ". 
Checking if [" + family + "] descendants have access.");
                                }
+                               // buildRequest again since 
resourceMatchingScope changed
+                               // reset ResourceMatchingScope to SELF, 
ignoreDescendantDeny to true
                                
session.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF_OR_DESCENDANTS)
                                                .ignoreDescendantDeny(false)
                                                .buildRequest()
@@ -491,7 +516,51 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                                
session.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF);
                                session.ignoreDescendantDeny(true);
                        } else {
-                               LOG.debug("evaluateAccess: columns collection 
not empty.  Skipping Family level check, will do finer level access check.");
+                               boolean isColumnAuthOptimizationEnabled = 
hbasePlugin.getPropertyIsColumnAuthOptimizationEnabled();
+                               if(LOG.isDebugEnabled()) {
+                                       LOG.debug("evaluateAccess: columns 
collection not empty." +
+                                                       " Skipping Family level 
check, will do finer level access check for columns.");
+                               }
+                               if (isColumnAuthOptimizationEnabled) {
+                                       session.column(null)
+                                                       .buildRequest()
+                                                       .authorize();
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug(
+                                                               
"evaluateAccess: isColumnAuthOptimizationEnabled={}, 
isColumnFamilyAuthorized={}",
+                                                               
isColumnAuthOptimizationEnabled, session.isAuthorized());
+                                       }
+                                       if(session.isAuthorized()) {
+
+                                               //check if column family fully 
authorized i.e. no deny for columns
+                                               session.column(null)
+                                                               
.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF_OR_DESCENDANTS)
+                                                               
.ignoreDescendantDeny(false)
+                                                               .buildRequest()
+                                                               .authorize();
+
+                                               boolean 
isColumnFamilyAndDescendantsAuthorized = session.isAuthorized();
+                                               AuthzAuditEvent auditEvent = 
auditHandler.getAndDiscardMostRecentEvent();
+                                               // reset ResourceMatchingScope 
to SELF, ignoreDescendantDeny to true
+                                               
session.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF)
+                                                               
.ignoreDescendantDeny(true);
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug(
+                                                                       
"evaluateAccess: isColumnAuthOptimizationEnabled={}, 
isColumnFamilyAndDescendantsAuthorized={}",
+                                                                       
isColumnAuthOptimizationEnabled, isColumnFamilyAndDescendantsAuthorized);
+                                               }
+                                               if 
(isColumnFamilyAndDescendantsAuthorized) {
+                                                       
familiesFullyAuthorized.add(family);
+                                                       if (auditEvent != null) 
{
+                                                               if 
(LOG.isDebugEnabled()) {
+                                                                       
LOG.debug("evaluateAccess: isColumnAuthOptimizationEnabled ={}, adding family 
{} to familiesFullyAuthorized", isColumnAuthOptimizationEnabled, family);
+                                                               }
+                                                               
familyLevelAccessEvents.add(auditEvent);
+                                                       }
+                                                       continue;
+                                               }
+                                       }
+                               }
                                Set<String> accessibleColumns = new 
HashSet<String>(); // will be used in to populate our results cache for the 
filter
                                Iterator<String> columnIterator = new 
ColumnIterator(columns);
                                while (columnIterator.hasNext()) {
@@ -499,6 +568,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                                         if (LOG.isDebugEnabled()) {
                                                LOG.debug("evaluateAccess: 
Processing column: " + column);
                                        }
+                                       //buildRequest required again since now 
column is being set
                                        session.column(column)
                                                .buildRequest()
                                                .authorize();
@@ -533,7 +603,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                        }
                }
                // Cache of auth results are encapsulated the in the filter. 
Not every caller of the function uses it - only preGet and preOpt will.
-               RangerAuthorizationFilter filter = new 
RangerAuthorizationFilter(session, familesAccessAllowed, familesAccessDenied, 
familesAccessIndeterminate, columnsAccessAllowed);
+               RangerAuthorizationFilter filter = new 
RangerAuthorizationFilter(session, familesAccessAllowed, familesAccessDenied, 
familesAccessIndeterminate, columnsAccessAllowed, familiesFullyAuthorized);
                result = new ColumnFamilyAccessResult(everythingIsAccessible, 
somethingIsAccessible, authorizedEvents, familyLevelAccessEvents, deniedEvent, 
denialReason, filter);
                if (LOG.isDebugEnabled()) {
                        String message = String.format(messageTemplate, 
userName, operation, access, colFamiliesForDebugLoggingOnly, result.toString());
@@ -577,7 +647,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                        }
                }
        }
-       
+
        Filter combineFilters(Filter filter, Filter existingFilter) {
                Filter combinedFilter = filter;
                if (existingFilter != null) {
@@ -611,7 +681,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                        RangerPerfTracer.log(perf);
                }
        }
-       
+
        /**
         * This could run s
         * @param operation
@@ -630,7 +700,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                        String message = String.format(format, "Entering", 
operation, otherInformation, access, table, columnFamily, column);
                        LOG.debug(message);
                }
-               
+
                final String format =  "authorizeAccess: %s: Operation[%s], 
Info[%s], access[%s], table[%s], columnFamily[%s], column[%s], allowed[%s], 
reason[%s]";
                if (canSkipAccessCheck(user, operation, access, table)) {
                        if (LOG.isDebugEnabled()) {
@@ -640,7 +710,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                        return;
                }
 
-               
+
                HbaseAuditHandler auditHandler = _factory.getAuditHandler();
                AuthorizationSession session = new 
AuthorizationSession(hbasePlugin)
                        .operation(operation)
@@ -654,20 +724,20 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                        .column(column)
                        .buildRequest()
                        .authorize();
-               
+
                if (LOG.isDebugEnabled()) {
                        boolean allowed = session.isAuthorized();
                        String reason = session.getDenialReason();
                        String message = String.format(format, "Exiting", 
operation, otherInformation, access, table, columnFamily, column, allowed, 
reason);
                        LOG.debug(message);
                }
-               
+
                session.publishResults();
        }
-       
+
        boolean canSkipAccessCheck(User user, final String operation, String 
access, final String table)
                        throws AccessDeniedException {
-               
+
                boolean result = false;
                if (user == null) {
                        String message = "Unexpeceted: User is null: access 
denied, not audited!";
@@ -679,10 +749,10 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                } else {
                        LOG.debug("Can't skip access checks");
                }
-               
+
                return result;
        }
-       
+
        boolean canSkipAccessCheck(User user, final String operation, String 
access, final RegionCoprocessorEnvironment regionServerEnv) throws 
AccessDeniedException {
 
                // read access to metadata tables is always allowed and isn't 
audited.
@@ -708,7 +778,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                }
                return false;
        }
-       
+
        boolean isAccessForMetadataRead(String access, String table) {
                if (_authUtils.isReadAccess(access) && isSpecialTable(table)) {
                        LOG.debug("isAccessForMetadataRead: Metadata tables 
read: access allowed!");
@@ -731,7 +801,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
 
                authorizeAccess(ctx, request, null, action, table, null, null);
        }
-       
+
        protected void requirePermission(ObserverContext<?> ctx, String 
request, byte[] aTableName, byte[] aColumnFamily, byte[] aQualifier, 
Permission.Action action) throws AccessDeniedException {
 
                String table = Bytes.toString(aTableName);
@@ -740,7 +810,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
 
                authorizeAccess(ctx, request, null, action, table, 
columnFamily, column);
        }
-       
+
        protected void requirePermission(ObserverContext<?> ctx, String 
request, Permission.Action perm, RegionCoprocessorEnvironment env, 
Collection<byte[]> families) throws IOException {
                HashMap<byte[], Set<byte[]>> familyMap = new HashMap<byte[], 
Set<byte[]>>();
 
@@ -929,7 +999,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
        @Override
        public Result 
preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment 
increment) throws IOException {
                requirePermission(c, "increment", TablePermission.Action.WRITE, 
c.getEnvironment(), increment.getFamilyCellMap().keySet());
-               
+
                return null;
        }
 
@@ -1123,7 +1193,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                if(plugin == null) {
                        synchronized(RangerAuthorizationCoprocessor.class) {
                                plugin = hbasePlugin;
-                               
+
                                if(plugin == null) {
                                        plugin = new RangerHBasePlugin(appType);
 
@@ -1135,7 +1205,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                                }
                        }
                }
-               
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Start of Coprocessor: [" + coprocessorType + 
"]");
                }
@@ -1145,7 +1215,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
        public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put 
put, WALEdit edit, Durability durability) throws IOException {
                requirePermission(c, "put", TablePermission.Action.WRITE, 
c.getEnvironment(), put.getFamilyCellMap());
        }
-       
+
        @Override
        public void preGetOp(final 
ObserverContext<RegionCoprocessorEnvironment> rEnv, final Get get, final 
List<Cell> result) throws IOException {
                if (LOG.isDebugEnabled()) {
@@ -1216,7 +1286,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                }
 
                checkGetTableInfoAccess(ctx, "getTableDescriptors", 
descriptors, regex, _authUtils.getAccess(Action.CREATE));
-               
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug(String.format("<== 
postGetTableDescriptors(count(tableNamesList)=%s, count(descriptors)=%s, 
regex=%s)", tableNamesList == null ? 0 : tableNamesList.size(),
                                        descriptors == null ? 0 : 
descriptors.size(), regex));
@@ -1271,7 +1341,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
 
                if(UpdateRangerPoliciesOnGrantRevoke) {
                        GrantRevokeRequest grData = null;
-       
+
                        try {
                                grData = createGrantData(request);
 
@@ -1507,7 +1577,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                                nameSpace = namepsacePermission.getNamespace();
                        break;
                }
-               
+
                if(StringUtil.isEmpty(nameSpace) && 
StringUtil.isEmpty(tableName) && StringUtil.isEmpty(colFamily) && 
StringUtil.isEmpty(qualifier)) {
                        throw new Exception("grant(): 
namespace/table/columnFamily/columnQualifier not specified");
                }
@@ -1659,7 +1729,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
                ret.setForwardedAddresses(null);//TODO: Need to check with Knox 
proxy how they handle forwarded add.
                ret.setRemoteIPAddress(getRemoteAddress());
                ret.setRequestData(up.toString());
-               
+
                if(userName.startsWith(GROUP_PREFIX)) {
                        
ret.getGroups().add(userName.substring(GROUP_PREFIX.length()));
                } else {
@@ -1872,6 +1942,7 @@ public class RangerAuthorizationCoprocessor implements 
AccessControlService.Inte
 class RangerHBasePlugin extends RangerBasePlugin {
        private static final Logger LOG = 
LoggerFactory.getLogger(RangerHBasePlugin.class);
        boolean isHBaseShuttingDown  = false;
+       private boolean isColumnAuthOptimizationEnabled = false;
 
        public RangerHBasePlugin(String appType) {
                super("hbase", appType);
@@ -1894,6 +1965,18 @@ class RangerHBasePlugin extends RangerBasePlugin {
                }
                return ret;
        }
+       @Override
+       public void setPolicies(ServicePolicies policies) {
+               super.setPolicies(policies);
+               this.isColumnAuthOptimizationEnabled = 
Boolean.parseBoolean(this.getServiceConfigs().get(RangerHadoopConstants.HBASE_COLUMN_AUTH_OPTIMIZATION));
+               
LOG.info("isColumnAuthOptimizationEnabled="+this.isColumnAuthOptimizationEnabled);
+       }
+       public boolean getPropertyIsColumnAuthOptimizationEnabled(){
+               return this.isColumnAuthOptimizationEnabled;
+       }
+       public void setColumnAuthOptimizationEnabled(boolean enable){
+               this.isColumnAuthOptimizationEnabled = enable;
+       }
 }
 
 
diff --git 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilter.java
 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilter.java
index 3cccd0169..7ddd0910b 100644
--- 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilter.java
+++ 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilter.java
@@ -40,16 +40,18 @@ public class RangerAuthorizationFilter extends FilterBase {
        final Set<String> _familiesAccessDenied;
        final Set<String> _familiesAccessIndeterminate;
        final Map<String, Set<String>> _columnsAccessAllowed;
+       final Set<String> _familiesFullyAuthorized;
        final AuthorizationSession _session;
        final HbaseAuditHandler _auditHandler = 
HbaseFactory.getInstance().getAuditHandler();
 
        public RangerAuthorizationFilter(AuthorizationSession session, 
Set<String> familiesAccessAllowed, Set<String> familiesAccessDenied, 
Set<String> familiesAccessIndeterminate,
-                                                                        
Map<String, Set<String>> columnsAccessAllowed) {
+                                                                        
Map<String, Set<String>> columnsAccessAllowed, Set<String> 
familiesFullyAuthorized) {
                // the class assumes that all of these can be empty but none of 
these can be null
                _familiesAccessAllowed = familiesAccessAllowed;
                _familiesAccessDenied = familiesAccessDenied;
                _familiesAccessIndeterminate = familiesAccessIndeterminate;
                _columnsAccessAllowed = columnsAccessAllowed;
+               _familiesFullyAuthorized = familiesFullyAuthorized;
                // this session should have everything set on it except family 
and column which would be altered based on need
                _session = session;
                // we don't want to audit denial, so we need to make sure the 
hander is what we need it to be.
@@ -88,6 +90,9 @@ public class RangerAuthorizationFilter extends FilterBase {
                        LOG.warn("filterKeyValue: Unexpected - null/empty 
family! Access denied!");
                } else if (_familiesAccessDenied.contains(family)) {
                        LOG.debug("filterKeyValue: family found in access 
denied families cache.  Access denied.");
+               } else if 
(_session.getPropertyIsColumnAuthOptimizationEnabled() && 
_familiesFullyAuthorized.contains(family)){
+                       LOG.debug("filterKeyValue: 
ColumnAuthOptimizationEnabled and family found in fully authorized families 
cache.  Column authorization is not required");
+                       result = ReturnCode.INCLUDE;
                } else if (_columnsAccessAllowed.containsKey(family)) {
                        LOG.debug("filterKeyValue: family found in column level 
access results cache.");
                        if (_columnsAccessAllowed.get(family).contains(column)) 
{
diff --git 
a/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/AuthorizationSessionTest.java
 
b/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/AuthorizationSessionTest.java
index 2c8f9af52..2be256ffd 100644
--- 
a/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/AuthorizationSessionTest.java
+++ 
b/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/AuthorizationSessionTest.java
@@ -23,7 +23,6 @@ package org.apache.ranger.authorization.hbase;
 import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.hbase.security.User;
-import org.apache.ranger.plugin.service.RangerBasePlugin;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -71,7 +70,7 @@ public class AuthorizationSessionTest {
 
        @Test
        public void testIsBuildable() {
-               RangerBasePlugin plugin = new RangerBasePlugin("hbase", 
"hbase");
+               RangerHBasePlugin plugin = new RangerHBasePlugin( "hbase");
                AuthorizationSession session = new AuthorizationSession(plugin);
                try {
                        session.verifyBuildable();
@@ -136,7 +135,7 @@ public class AuthorizationSessionTest {
 
        @Test
        public void testAuthorize() {
-               RangerBasePlugin plugin = new RangerBasePlugin("hbase", 
"hbase");
+               RangerHBasePlugin plugin = new RangerHBasePlugin( "hbase");
                
                User user = mock(User.class);
                when(user.getShortName()).thenReturn("user1");
diff --git 
a/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/HBaseRangerAuthorizationTest.java
 
b/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/HBaseRangerAuthorizationTest.java
index b559c338a..d973079c7 100644
--- 
a/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/HBaseRangerAuthorizationTest.java
+++ 
b/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/HBaseRangerAuthorizationTest.java
@@ -20,31 +20,23 @@ package org.apache.ranger.authorization.hbase;
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.SnapshotDescription;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.security.access.AccessControlClient;
-import org.apache.hadoop.hbase.security.access.NamespacePermission;
-import org.apache.hadoop.hbase.security.access.Permission;
-import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
+import org.apache.hadoop.hbase.security.access.*;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
@@ -53,12 +45,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A custom RangerAdminClient is plugged into Ranger in turn, which loads 
security policies from a local file. These policies were 
+ * A custom RangerAdminClient is plugged into Ranger in turn, which loads 
security policies from a local file. These policies were
  * generated in the Ranger Admin UI for a service called "HBaseTest":
- * 
+ *
  * a) The "logged in" user can do anything
  * b) The IT group can read and write to the "temp" table, but only the 
"colfam1" column family.
  * c) The QA group can read and write to tables in "test_namespace" namespace.
+ * d) The IT group can read and write to the "temp6" table's "colfam1" column 
family and "col1" column only.
+ * e) The IT group for "temp7" table and "temp8" table can read "colfam1" 
column family but has denied for "col1" column in that column family
+ * f) The IT2 group for "temp7" table can read "colfam1" column family and 
does not have any denied columns in the column family
  *
  * In addition we have some TAG based policies created in Atlas and synced 
into Ranger:
  *
@@ -68,7 +63,7 @@ import org.slf4j.LoggerFactory;
  * the "temp3" table.
  *
  * Policies available from admin via:
- * 
+ *
  * http://localhost:6080/service/plugins/policies/download/cl1_hbase
  */
 @org.junit.Ignore
@@ -79,23 +74,26 @@ public class HBaseRangerAuthorizationTest {
     private static int port;
     private static HBaseTestingUtility utility;
 
+
     @org.junit.BeforeClass
     public static void setup() throws Exception {
         port = getFreePort();
-        
+
         utility = new HBaseTestingUtility();
         
utility.getConfiguration().set("test.hbase.zookeeper.property.clientPort", "" + 
port);
+        utility.getConfiguration().set("hbase.master.hostname", "localhost");
         utility.getConfiguration().set("hbase.master.port", "" + 
getFreePort());
         utility.getConfiguration().set("hbase.master.info.port", "" + 
getFreePort());
+        utility.getConfiguration().set("hbase.unsafe.regionserver.hostname", 
"localhost");
         utility.getConfiguration().set("hbase.regionserver.port", "" + 
getFreePort());
         utility.getConfiguration().set("hbase.regionserver.info.port", "" + 
getFreePort());
         utility.getConfiguration().set("zookeeper.znode.parent", 
"/hbase-unsecure");
 
         // Enable authorization
         utility.getConfiguration().set("hbase.security.authorization", "true");
-        utility.getConfiguration().set("hbase.coprocessor.master.classes", 
+        utility.getConfiguration().set("hbase.coprocessor.master.classes",
             
"org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor");
-        utility.getConfiguration().set("hbase.coprocessor.region.classes", 
+        utility.getConfiguration().set("hbase.coprocessor.region.classes",
             
"org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor");
 
         utility.startMiniCluster();
@@ -112,30 +110,21 @@ public class HBaseRangerAuthorizationTest {
 
         // Create a table
         if (!admin.tableExists(TableName.valueOf("default:temp"))) {
-            HTableDescriptor tableDescriptor = new 
HTableDescriptor(TableName.valueOf("default:temp"));
+            TableDescriptorBuilder tableDescriptor = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("default:temp"));
 
             // Adding column families to table descriptor
-            tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
-            tableDescriptor.addFamily(new HColumnDescriptor("colfam2"));
+            
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+            
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam2".getBytes()).build());
 
-            admin.createTable(tableDescriptor);
+            admin.createTable(tableDescriptor.build());
         }
 
-               if (!admin.tableExists(TableName.valueOf("default:temp5"))) {
-                       HTableDescriptor tableDescriptor = new 
HTableDescriptor(TableName.valueOf("default:temp5"));
-
-                       // Adding column families to table descriptor
-                       tableDescriptor.addFamily(new 
HColumnDescriptor("colfam1"));
-
-                       admin.createTable(tableDescriptor);
-               }
-
         // Add a new row
         Put put = new Put(Bytes.toBytes("row1"));
         put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val1"));
         Table table = conn.getTable(TableName.valueOf("temp"));
         table.put(put);
-        
+
         put = new Put(Bytes.toBytes("row1"));
         put.addColumn(Bytes.toBytes("colfam2"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
         table.put(put);
@@ -146,15 +135,40 @@ public class HBaseRangerAuthorizationTest {
 
         // Create a table
         if (!admin.tableExists(TableName.valueOf("test_namespace", "temp"))) {
-            HTableDescriptor tableDescriptor = new 
HTableDescriptor(TableName.valueOf("test_namespace", "temp"));
+            TableDescriptorBuilder tableDescriptor =  
TableDescriptorBuilder.newBuilder(TableName.valueOf("test_namespace", "temp"));
 
             // Adding column families to table descriptor
-            tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
-            tableDescriptor.addFamily(new HColumnDescriptor("colfam2"));
+            
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+            
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam2".getBytes()).build());
 
-            admin.createTable(tableDescriptor);
+            admin.createTable(tableDescriptor.build());
         }
 
+        if (!admin.tableExists(TableName.valueOf("default:temp5"))) {
+            TableDescriptorBuilder tableDescriptor =  
TableDescriptorBuilder.newBuilder(TableName.valueOf("default:temp5"));
+            // Adding column families to table descriptor
+            
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+            admin.createTable(tableDescriptor.build());
+        }
+        if (!admin.tableExists(TableName.valueOf("default:temp6"))) {
+            TableDescriptorBuilder tableDescriptor =  
TableDescriptorBuilder.newBuilder(TableName.valueOf("default:temp6"));
+            // Adding column families to table descriptor
+            
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+            admin.createTable(tableDescriptor.build());
+        }
+        if (!admin.tableExists(TableName.valueOf("default:temp7"))) {
+            TableDescriptorBuilder tableDescriptor =  
TableDescriptorBuilder.newBuilder(TableName.valueOf("default:temp7"));
+            // Adding column families to table descriptor
+            
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+            admin.createTable(tableDescriptor.build());
+        }
+        table = conn.getTable(TableName.valueOf("default", "temp7"));
+
+        // Add a new row
+        put = new Put(Bytes.toBytes("row1"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val1"));
+        table.put(put);
+
         table = conn.getTable(TableName.valueOf("test_namespace", "temp"));
 
         // Add a new row
@@ -166,33 +180,51 @@ public class HBaseRangerAuthorizationTest {
         put.addColumn(Bytes.toBytes("colfam2"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
         table.put(put);
 
+        if (!admin.tableExists(TableName.valueOf("default:temp8"))) {
+            TableDescriptorBuilder tableDescriptor =  
TableDescriptorBuilder.newBuilder(TableName.valueOf("default:temp8"));
+            // Adding column families to table descriptor
+            
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+            admin.createTable(tableDescriptor.build());
+        }
+        table = conn.getTable(TableName.valueOf("default", "temp8"));
+
+        // Add a new row
+        put = new Put(Bytes.toBytes("row1"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val1"));
+        table.put(put);
+
+        // Add a new row
+        put = new Put(Bytes.toBytes("row2"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col2"), 
Bytes.toBytes("val2"));
+        table.put(put);
+
         conn.close();
     }
-    
+
     @org.junit.AfterClass
     public static void cleanup() throws Exception {
         utility.shutdownMiniCluster();
     }
-    
+
     @Test
     public void testReadTablesAsProcessOwner() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
         conf.set("hbase.zookeeper.quorum", "localhost");
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-        
+
         Connection conn = ConnectionFactory.createConnection(conf);
         Admin admin = conn.getAdmin();
 
-        HTableDescriptor[] tableDescriptors = admin.listTables();
-        for (HTableDescriptor desc : tableDescriptors) {
+        List<TableDescriptor> tableDescriptors = admin.listTableDescriptors();
+        for (TableDescriptor desc : tableDescriptors) {
             LOG.info("Found table:[" + desc.getTableName().getNameAsString() + 
"]");
         }
-        Assert.assertEquals(3, tableDescriptors.length);
+        Assert.assertEquals(6, tableDescriptors.size());
 
         conn.close();
     }
-    
+
     // This should fail as the "IT" group only has read privileges, not admin 
privileges, on the table "temp"
     @Test
     public void testReadTablesAsGroupIT() throws Exception {
@@ -200,7 +232,7 @@ public class HBaseRangerAuthorizationTest {
         conf.set("hbase.zookeeper.quorum", "localhost");
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-        
+
         String user = "IT";
 
         UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] {"IT"});
@@ -208,40 +240,40 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Admin admin = conn.getAdmin();
-                
-                HTableDescriptor[] tableDescriptors = admin.listTables();
-                for (HTableDescriptor desc : tableDescriptors) {
+
+                List <TableDescriptor> tableDescriptors = 
admin.listTableDescriptors();
+                for (TableDescriptor desc : tableDescriptors) {
                     LOG.info("Found table:[" + 
desc.getTableName().getNameAsString() + "]");
                 }
-                Assert.assertEquals(0, tableDescriptors.length);
-        
+                Assert.assertEquals(0, tableDescriptors.size());
+
                 conn.close();
                 return null;
             }
         });
     }
-    
+
     @Test
     public void testCreateAndDropTables() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
         conf.set("hbase.zookeeper.quorum", "localhost");
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-        
+
         Connection conn = ConnectionFactory.createConnection(conf);
         Admin admin = conn.getAdmin();
 
         // Create a new table as process owner
-        HTableDescriptor tableDescriptor = new 
HTableDescriptor(TableName.valueOf("temp2"));
+        TableDescriptorBuilder tableDescriptor = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("temp2"));
 
         // Adding column families to table descriptor
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam2"));
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam2".getBytes()).build());
 
-        admin.createTable(tableDescriptor);
+        admin.createTable(tableDescriptor.build());
 
         conn.close();
-        
+
         // Try to disable + delete the table as the "IT" group
         String user = "IT";
 
@@ -250,7 +282,7 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Admin admin = conn.getAdmin();
-                
+
                 try {
                     admin.disableTable(TableName.valueOf("temp2"));
                     admin.deleteTable(TableName.valueOf("temp2"));
@@ -258,40 +290,40 @@ public class HBaseRangerAuthorizationTest {
                 } catch (IOException ex) {
                     // expected
                 }
-                
+
                 conn.close();
                 return null;
             }
         });
-        
+
         // Now disable and delete as process owner
         conn = ConnectionFactory.createConnection(conf);
         admin = conn.getAdmin();
         admin.disableTable(TableName.valueOf("temp2"));
         admin.deleteTable(TableName.valueOf("temp2"));
-        
+
         conn.close();
     }
-    
+
     @Test
     public void testReadRowAsProcessOwner() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
         conf.set("hbase.zookeeper.quorum", "localhost");
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-        
+
         Connection conn = ConnectionFactory.createConnection(conf);
         Table table = conn.getTable(TableName.valueOf("temp"));
-        
+
         // Read a row
         Get get = new Get(Bytes.toBytes("row1"));
         Result result = table.get(get);
         byte[] valResult = result.getValue(Bytes.toBytes("colfam1"), 
Bytes.toBytes("col1"));
         Assert.assertTrue(Arrays.equals(valResult, Bytes.toBytes("val1")));
-        
+
         conn.close();
     }
-    
+
     @Test
     public void testReadRowAsGroupIT() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
@@ -306,7 +338,7 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Table table = conn.getTable(TableName.valueOf("temp"));
-                
+
                 // Read a row
                 Get get = new Get(Bytes.toBytes("row1"));
                 Result result = table.get(get);
@@ -318,7 +350,7 @@ public class HBaseRangerAuthorizationTest {
             }
         });
     }
-    
+
     // This should fail as "public" doesn't have the right to read the table
     @Test
     public void testReadRowAsGroupPublic() throws Exception {
@@ -334,7 +366,7 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Table table = conn.getTable(TableName.valueOf("temp"));
-                
+
                 // Read a row
                 try {
                     Get get = new Get(Bytes.toBytes("row1"));
@@ -350,14 +382,14 @@ public class HBaseRangerAuthorizationTest {
             }
         });
     }
-    
+
     @Test
     public void testReadRowFromColFam2AsProcessOwner() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
         conf.set("hbase.zookeeper.quorum", "localhost");
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-        
+
         Connection conn = ConnectionFactory.createConnection(conf);
         Table table = conn.getTable(TableName.valueOf("temp"));
 
@@ -369,7 +401,7 @@ public class HBaseRangerAuthorizationTest {
 
         conn.close();
     }
-    
+
     @Test
     public void testReadRowFromColFam2AsGroupIT() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
@@ -384,7 +416,7 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Table table = conn.getTable(TableName.valueOf("temp"));
-                
+
                 // Read a row
                 Get get = new Get(Bytes.toBytes("row1"));
                 Result result = table.get(get);
@@ -396,25 +428,25 @@ public class HBaseRangerAuthorizationTest {
             }
         });
     }
-    
+
     @Test
     public void testWriteRowAsProcessOwner() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
         conf.set("hbase.zookeeper.quorum", "localhost");
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-        
+
         Connection conn = ConnectionFactory.createConnection(conf);
         Table table = conn.getTable(TableName.valueOf("temp"));
-        
+
         // Add a new row
         Put put = new Put(Bytes.toBytes("row2"));
         put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
         table.put(put);
-        
+
         conn.close();
     }
-    
+
     @Test
     public void testWriteRowAsGroupIT() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
@@ -429,18 +461,18 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Table table = conn.getTable(TableName.valueOf("temp"));
-                
+
                 // Add a new row
                 Put put = new Put(Bytes.toBytes("row3"));
                 put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
                 table.put(put);
-                
+
                 conn.close();
                 return null;
             }
         });
     }
-    
+
     @Test
     public void testWriteRowAsGroupPublic() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
@@ -455,7 +487,7 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Table table = conn.getTable(TableName.valueOf("temp"));
-                
+
                 // Add a new row
                 try {
                     Put put = new Put(Bytes.toBytes("row3"));
@@ -465,13 +497,13 @@ public class HBaseRangerAuthorizationTest {
                 } catch (IOException ex) {
                     // expected
                 }
-                
+
                 conn.close();
                 return null;
             }
         });
     }
-    
+
     @Test
     public void testWriteRowInColFam2AsGroupIT() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
@@ -486,7 +518,7 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Table table = conn.getTable(TableName.valueOf("temp"));
-                
+
                 // Add a new row
                 try {
                     Put put = new Put(Bytes.toBytes("row3"));
@@ -496,46 +528,46 @@ public class HBaseRangerAuthorizationTest {
                 } catch (IOException ex) {
                     // expected
                 }
-                
+
                 conn.close();
                 return null;
             }
         });
     }
-    
+
     @Test
     public void testReadRowInAnotherTable() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
         conf.set("hbase.zookeeper.quorum", "localhost");
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-        
+
         Connection conn = ConnectionFactory.createConnection(conf);
         Admin admin = conn.getAdmin();
 
         // Create a new table as process owner
-        HTableDescriptor tableDescriptor = new 
HTableDescriptor(TableName.valueOf("temp4"));
+        TableDescriptorBuilder tableDescriptor = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("temp4"));
 
         // Adding column families to table descriptor
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam2"));
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam2".getBytes()).build());
 
-        admin.createTable(tableDescriptor);
+        admin.createTable(tableDescriptor.build());
 
         // Write a value
         Put put = new Put(Bytes.toBytes("row1"));
         put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val1"));
         Table table = conn.getTable(TableName.valueOf("temp4"));
         table.put(put);
-        
+
         // Read a row
         Get get = new Get(Bytes.toBytes("row1"));
         Result result = table.get(get);
         byte[] valResult = result.getValue(Bytes.toBytes("colfam2"), 
Bytes.toBytes("col1"));
         Assert.assertNull(valResult);
-        
+
         conn.close();
-        
+
         // Now try to read the row as group "IT" - it should fail as "IT" can 
only read from table "temp"
         String user = "IT";
 
@@ -544,7 +576,7 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Table table = conn.getTable(TableName.valueOf("temp4"));
-                
+
                 // Read a row
                 try {
                     Get get = new Get(Bytes.toBytes("row1"));
@@ -559,48 +591,48 @@ public class HBaseRangerAuthorizationTest {
                 return null;
             }
         });
-        
+
         // Now disable and delete as process owner
         conn = ConnectionFactory.createConnection(conf);
         admin = conn.getAdmin();
         admin.disableTable(TableName.valueOf("temp4"));
         admin.deleteTable(TableName.valueOf("temp4"));
-        
+
         conn.close();
     }
-    
+
     @Test
     public void testDeleteRowAsProcessOwner() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
         conf.set("hbase.zookeeper.quorum", "localhost");
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-        
+
         Connection conn = ConnectionFactory.createConnection(conf);
         Table table = conn.getTable(TableName.valueOf("temp"));
-        
+
         // Add a new row
         Put put = new Put(Bytes.toBytes("row4"));
         put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
         table.put(put);
-        
+
         // Delete the new row
         Delete delete = new Delete(Bytes.toBytes("row4"));
         table.delete(delete);
-        
+
         conn.close();
     }
-    
+
     @Test
     public void testDeleteRowAsGroupIT() throws Exception {
         final Configuration conf = HBaseConfiguration.create();
         conf.set("hbase.zookeeper.quorum", "localhost");
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-        
+
         Connection conn = ConnectionFactory.createConnection(conf);
         Table table = conn.getTable(TableName.valueOf("temp"));
-        
+
         // Add a new row (as process owner)
         Put put = new Put(Bytes.toBytes("row5"));
         put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
@@ -613,7 +645,7 @@ public class HBaseRangerAuthorizationTest {
             public Void run() throws Exception {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Table table = conn.getTable(TableName.valueOf("temp"));
-                
+
                 try {
                     // Delete the new row
                     Delete delete = new Delete(Bytes.toBytes("row5"));
@@ -622,16 +654,16 @@ public class HBaseRangerAuthorizationTest {
                 } catch (IOException ex) {
                     // expected
                 }
-                
+
                 conn.close();
                 return null;
             }
         });
-        
+
         // Delete the new row (as process owner)
         Delete delete = new Delete(Bytes.toBytes("row5"));
         table.delete(delete);
-        
+
         conn.close();
     }
 
@@ -666,7 +698,8 @@ public class HBaseRangerAuthorizationTest {
                 admin.snapshot("test_snapshot", tableName);
 
                 // Clone snapshot
-                HTableDescriptor tableDescriptor = new 
HTableDescriptor(TableName.valueOf("test_namespace", "temp_cloned"));
+                TableDescriptor tableDescriptor = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("test_namespace", 
"temp_cloned")).build();
+
                 TableName newTableName = tableDescriptor.getTableName();
                 admin.cloneSnapshot("test_snapshot", newTableName);
                 admin.disableTable(newTableName);
@@ -746,11 +779,11 @@ public class HBaseRangerAuthorizationTest {
         conf.set("hbase.zookeeper.property.clientPort", "" + port);
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
 
-        final HTableDescriptor tableDescriptor = new 
HTableDescriptor(TableName.valueOf("temp3"));
+        final TableDescriptorBuilder tableDescriptor = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("temp3"));
 
         // Adding column families to table descriptor
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam2"));
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam2".getBytes()).build());
 
         // Try to create a "temp3" table as the "IT" group - this should fail
         String user = "IT";
@@ -763,7 +796,7 @@ public class HBaseRangerAuthorizationTest {
                 Admin admin = conn.getAdmin();
 
                 try {
-                    admin.createTable(tableDescriptor);
+                    admin.createTable(tableDescriptor.build());
                     Assert.fail("Failure expected on an unauthorized user");
                 } catch (IOException ex) {
                     // expected
@@ -781,7 +814,7 @@ public class HBaseRangerAuthorizationTest {
                 Connection conn = ConnectionFactory.createConnection(conf);
                 Admin admin = conn.getAdmin();
 
-                admin.createTable(tableDescriptor);
+                admin.createTable(tableDescriptor.build());
 
                 conn.close();
                 return null;
@@ -806,16 +839,16 @@ public class HBaseRangerAuthorizationTest {
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
 
         // Create a new table as process owner
-        final HTableDescriptor tableDescriptor = new 
HTableDescriptor(TableName.valueOf("temp3"));
+        final TableDescriptorBuilder tableDescriptor = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("temp3"));
 
         // Adding column families to table descriptor
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam2"));
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam2".getBytes()).build());
 
         Connection conn = ConnectionFactory.createConnection(conf);
         Admin admin = conn.getAdmin();
 
-        admin.createTable(tableDescriptor);
+        admin.createTable(tableDescriptor.build());
 
         // Add a new row
         Put put = new Put(Bytes.toBytes("row1"));
@@ -891,16 +924,16 @@ public class HBaseRangerAuthorizationTest {
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
 
         // Create a new table as process owner
-        final HTableDescriptor tableDescriptor = new 
HTableDescriptor(TableName.valueOf("temp3"));
+        final TableDescriptorBuilder tableDescriptor = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("temp3"));
 
         // Adding column families to table descriptor
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam1"));
-        tableDescriptor.addFamily(new HColumnDescriptor("colfam2"));
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam2".getBytes()).build());
 
         Connection conn = ConnectionFactory.createConnection(conf);
         Admin admin = conn.getAdmin();
 
-        admin.createTable(tableDescriptor);
+        admin.createTable(tableDescriptor.build());
 
         // Add a new row
         Put put = new Put(Bytes.toBytes("row1"));
@@ -975,64 +1008,431 @@ public class HBaseRangerAuthorizationTest {
         conn.close();
     }
 
-       @Test
-       public void testGetUserPermission() throws Throwable {
-               final Configuration conf = HBaseConfiguration.create();
-               conf.set("hbase.zookeeper.quorum", "localhost");
-               conf.set("hbase.zookeeper.property.clientPort", "" + port);
-               conf.set("zookeeper.znode.parent", "/hbase-unsecure");
-               String user = "IT";
-               UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] { "IT" });
-               ugi.doAs(new PrivilegedExceptionAction<Void>() {
-                       public Void run() throws Exception {
-                               try (Connection conn = 
ConnectionFactory.createConnection(conf)) {
-                                       
AccessControlClient.getUserPermissions(conn, "temp");
-                                       Assert.fail();
-                               } catch (Throwable e) {
-                                       // expected
-                               }
-                               return null;
-                       }
-
-               });
-
-               user = "QA";
-               ugi = UserGroupInformation.createUserForTesting(user, new 
String[] { "QA" });
-               ugi.doAs(new PrivilegedExceptionAction<Void>() {
-                       public Void run() throws Exception {
-                               List<UserPermission> userPermissions;
-                               try (Connection conn = 
ConnectionFactory.createConnection(conf)) {
-                                       userPermissions = 
AccessControlClient.getUserPermissions(conn, "@test_namespace");
-                               } catch (Throwable e) {
-                                       throw new Exception(e);
-                               }
-                               boolean found = false;
-                               for (UserPermission namespacePermission : 
userPermissions) {
-                                       if (namespacePermission.getPermission() 
instanceof NamespacePermission) {
-                                               found = 
StringUtils.equals(namespacePermission.getUser(), "@QA");
-                                               if (found) {
-                                                       break;
-                                               }
-                                       }
-                               }
-                               Assert.assertTrue("QA is not found", found);
-                               return null;
-                       }
-               });
-
-               List<UserPermission> userPermissions;
-               try (Connection conn = 
ConnectionFactory.createConnection(conf)) {
-                       userPermissions = 
AccessControlClient.getUserPermissions(conn, "temp5");
-               } catch (Throwable e) {
-                       throw new Exception(e);
-               }
-
-               UserPermission userPermission = new UserPermission("@IT",
-                               
Permission.newBuilder(TableName.valueOf("temp5")).withActions(Permission.Action.READ,
 Permission.Action.WRITE, Permission.Action.EXEC).build());
-
-               Assert.assertTrue("@IT permission should be there", 
userPermissions.contains(userPermission));
-
-       }
+    @Test
+    public void testGetUserPermission() throws Throwable {
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", "localhost");
+        conf.set("hbase.zookeeper.property.clientPort", "" + port);
+        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+        String user = "IT";
+        UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] { "IT" });
+        if(!utility.getHBaseCluster().isDistributedCluster()) {
+            RangerAuthorizationCoprocessor authorizationCoprocessor =
+                
utility.getHBaseCluster().getMaster().getMasterCoprocessorHost().
+                    findCoprocessor(RangerAuthorizationCoprocessor.class);
+            RpcController rpcController = new RpcController() {
+                @Override
+                public void reset() {
+
+                }
+
+                @Override
+                public boolean failed() {
+                    return false;
+                }
+
+                @Override
+                public String errorText() {
+                    return null;
+                }
+
+                @Override
+                public void startCancel() {
+
+                }
+
+                @Override
+                public void setFailed(String reason) {
+
+                }
+
+                @Override
+                public boolean isCanceled() {
+                    return false;
+                }
+
+                @Override
+                public void notifyOnCancel(RpcCallback<Object> callback) {
+
+                }
+            };
+            ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                public Void run() throws Exception {
+                    AccessControlProtos.GetUserPermissionsRequest 
requestTablePerms = getTableUserPermissions("temp");
+                    authorizationCoprocessor.getUserPermissions(rpcController, 
requestTablePerms,
+                        new 
RpcCallback<AccessControlProtos.GetUserPermissionsResponse>() {
+                            @Override
+                            public void 
run(AccessControlProtos.GetUserPermissionsResponse message) {
+                                if (message != null) {
+                                    for (AccessControlProtos.UserPermission 
perm : message
+                                        .getUserPermissionList()) {
+                                        
AccessControlUtil.toUserPermission(perm);
+                                        Assert.fail();
+                                    }
+                                }
+                            }
+                        });
+                    return null;
+                }
+
+            });
+
+            user = "QA";
+            ugi = UserGroupInformation.createUserForTesting(user, new String[] 
{ "QA" });
+            ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                public Void run() throws Exception {
+                    final List<UserPermission> userPermissions = new 
ArrayList<UserPermission>();
+                    AccessControlProtos.GetUserPermissionsRequest 
requestTablePerms = getNamespaceUserPermissions("test_namespace");
+                    getUserPermissions(userPermissions, requestTablePerms, 
authorizationCoprocessor, rpcController);
+                    boolean found = false;
+                    for (UserPermission namespacePermission : userPermissions) 
{
+                        if (namespacePermission.getPermission() instanceof 
NamespacePermission  ) {
+                            found = 
Bytes.equals(namespacePermission.getUser().getBytes(), Bytes.toBytes("@QA"));
+                            if (found) {
+                                break;
+                            }
+                        }
+                    }
+                    Assert.assertTrue("QA is not found", found);
+                    return null;
+                }
+            });
+
+            final List<UserPermission> userPermissions = new ArrayList<>();
+            AccessControlProtos.GetUserPermissionsRequest requestTablePerms = 
getTableUserPermissions("temp5");
+            getUserPermissions(userPermissions, requestTablePerms, 
authorizationCoprocessor, rpcController);
+            Permission p = Permission.newBuilder(TableName.valueOf("temp5")).
+                withActions(Permission.Action.READ, Permission.Action.WRITE, 
Permission.Action.EXEC).build();
+            UserPermission userPermission = new UserPermission("@IT", p);
+            Assert.assertTrue("@IT permission should be there", 
userPermissions.contains(userPermission));
+        }
+    }
+    @Test
+    public void testWriteRowAsGroupIT2() throws Exception {
+        //check access to table temp6 to test the policy 
TempPolicyForOptimizedColAuth with non * column
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", "localhost");
+        conf.set("hbase.zookeeper.property.clientPort", "" + port);
+        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+        String user = "IT";
+        UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] {"IT"});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws Exception {
+                Connection conn = ConnectionFactory.createConnection(conf);
+                Table table = conn.getTable(TableName.valueOf("temp6"));
+                // Add a new row
+                Put put = new Put(Bytes.toBytes("row3"));
+                put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
+                table.put(put);
+                conn.close();
+                return null;
+            }
+        });
+    }
+    @Test
+    public void testWriteRowAsGroupIT2Optimized() throws Exception {
+        // No behavior change from testWriteRowAsGroupIT2()
+        enableColumnAuthOptimization(true);
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", "localhost");
+        conf.set("hbase.zookeeper.property.clientPort", "" + port);
+        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+        String user = "IT";
+        UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] {"IT"});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws Exception {
+                Connection conn = ConnectionFactory.createConnection(conf);
+                Table table = conn.getTable(TableName.valueOf("temp6"));
+                // Add a new row
+                Put put = new Put(Bytes.toBytes("row3"));
+                put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
+                table.put(put);
+                conn.close();
+                return null;
+            }
+        });
+        enableColumnAuthOptimization(false);
+    }
+    @Test
+    public void testWriteRowDeniedAsGroupIT2() throws Exception {
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", "localhost");
+        conf.set("hbase.zookeeper.property.clientPort", "" + port);
+        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+        String user = "IT";
+        UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] {"IT"});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws Exception {
+                Connection conn = ConnectionFactory.createConnection(conf);
+                Table table = conn.getTable(TableName.valueOf("temp7"));
+                try {
+                    // Add a new row
+                    Put put = new Put(Bytes.toBytes("row3"));
+                    put.addColumn(Bytes.toBytes("colfam1"), 
Bytes.toBytes("col1"), Bytes.toBytes("val2"));
+                    table.put(put);
+                    Assert.fail("Failure expected on an unauthorized user");
+                }
+                catch (IOException ex) {
+                    // expected
+                }
+                conn.close();
+                return null;
+            }
+        });
+    }
+    @Test
+    public void testWriteRowDeniedAsGroupIT2Optimized() throws Exception {
+        // no behavior change from testWriteRowDeniedAsGroupIT2
+        enableColumnAuthOptimization(true);
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", "localhost");
+        conf.set("hbase.zookeeper.property.clientPort", "" + port);
+        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+        String user = "IT";
+        UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] {"IT"});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws Exception {
+                Connection conn = ConnectionFactory.createConnection(conf);
+                Table table = conn.getTable(TableName.valueOf("temp7"));
+                try {
+                    // Add a new row
+                    Put put = new Put(Bytes.toBytes("row3"));
+                    put.addColumn(Bytes.toBytes("colfam1"), 
Bytes.toBytes("col1"), Bytes.toBytes("val2"));
+                    table.put(put);
+                    Assert.fail("Failure expected on an unauthorized user");
+                }
+                catch (IOException ex) {
+                    // expected
+                }
+                conn.close();
+                return null;
+            }
+        });
+        enableColumnAuthOptimization(false);
+    }
+    @Test
+    public void testScanTableAsGroupIT() throws Exception {
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", "localhost");
+        conf.set("hbase.zookeeper.property.clientPort", "" + port);
+        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+        String user = "IT";
+        UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] {"IT"});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws Exception {
+                Connection conn = ConnectionFactory.createConnection(conf);
+                Table table = conn.getTable(TableName.valueOf("temp8"));
+                try {
+                    Scan scan = new Scan();
+                    ResultScanner scanner = table.getScanner(scan);
+                    int numRowsInResult = 0;
+                    for (Result result = scanner.next(); result != null; 
result = scanner.next()){
+                        System.out.println("Found row : " + result);
+                        numRowsInResult += 1;
+                    }
+                    //while there are 2 rows in this table, one of the columns 
is explicitly denied so only one column should be in the result
+                    Assert.assertEquals(1,numRowsInResult);
+                }
+                catch (IOException ex) {
+                    // expected
+                }
+                conn.close();
+                return null;
+            }
+        });
+
+        user = "IT2";
+        ugi = UserGroupInformation.createUserForTesting(user, new String[] 
{"IT2"});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws Exception {
+                Connection conn = ConnectionFactory.createConnection(conf);
+                Table table = conn.getTable(TableName.valueOf("temp8"));
+                try {
+                    Scan scan = new Scan();
+                    ResultScanner scanner = table.getScanner(scan);
+                    int numRowsInResult = 0;
+                    for (Result result = scanner.next(); result != null; 
result = scanner.next()){
+                        System.out.println("Found row : " + result);
+                        numRowsInResult += 1;
+                    }
+                    //there are 2 rows in this table, group IT2 does not have 
any denied columns
+                    Assert.assertEquals(2,numRowsInResult);
+                }
+                catch (IOException ex) {
+                    // expected
+                }
+                conn.close();
+                return null;
+            }
+        });
+    }
+
+
+    @Test
+    public void testWriteRowAsGroupPublicOptimized() throws Exception {
+        enableColumnAuthOptimization(true); // enable optimization
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", "localhost");
+        conf.set("hbase.zookeeper.property.clientPort", "" + port);
+        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+        String user = "public";
+        UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] {"public"});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws Exception {
+                Connection conn = ConnectionFactory.createConnection(conf);
+                Table table = conn.getTable(TableName.valueOf("temp"));
+                // Add a new row
+                try {
+                    Put put = new Put(Bytes.toBytes("row3"));
+                    put.addColumn(Bytes.toBytes("colfam1"), 
Bytes.toBytes("col1"), Bytes.toBytes("val2"));
+                    table.put(put);
+                    Assert.fail("Failure expected on an unauthorized user");
+                } catch (IOException ex) {
+                    // expected
+                }
+                conn.close();
+                return null;
+            }
+        });
+        enableColumnAuthOptimization(false); // disable optimization after 
test case complete
+    }
+    @Test
+    public void testTagBasedColumnPolicyOptimized() throws Exception {
+        // There should not be any behavior change from 
testTagBasedColumnPolicy() even after
+        // column optimization enabled as tags will exist for the accessed 
resource and should
+        // fall back to default behavior
+        enableColumnAuthOptimization(true);
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", "localhost");
+        conf.set("hbase.zookeeper.property.clientPort", "" + port);
+        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+
+        // Create a new table as process owner
+        final TableDescriptorBuilder tableDescriptor = 
TableDescriptorBuilder.newBuilder(TableName.valueOf("temp3"));
+
+        // Adding column families to table descriptor
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam1".getBytes()).build());
+        
tableDescriptor.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("colfam2".getBytes()).build());
+
+        Connection conn = ConnectionFactory.createConnection(conf);
+        Admin admin = conn.getAdmin();
+
+        admin.createTable(tableDescriptor.build());
+
+        // Add a new row
+        Put put = new Put(Bytes.toBytes("row1"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val1"));
+        Table table = conn.getTable(TableName.valueOf("temp3"));
+        table.put(put);
+
+        put = new Put(Bytes.toBytes("row1"));
+        put.addColumn(Bytes.toBytes("colfam2"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
+        table.put(put);
+
+        conn.close();
+
+        String user = "dev";
+        UserGroupInformation ugi = 
UserGroupInformation.createUserForTesting(user, new String[] {"dev"});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws Exception {
+                Connection conn = ConnectionFactory.createConnection(conf);
+                Table table = conn.getTable(TableName.valueOf("temp3"));
+
+                // Try to write something to the "col1" column of the 
"colfam1" of the "temp3" table as the "dev" group
+                // - this should work
+                Put put = new Put(Bytes.toBytes("row3"));
+                put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
+                table.put(put);
+
+                // Try to write something to the "col2" column of the 
"colfam1" of the "temp3" table as the "dev" group
+                // - this should fail
+                put = new Put(Bytes.toBytes("row3"));
+                put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col2"), 
Bytes.toBytes("val2"));
+                try {
+                    table.put(put);
+                    Assert.fail("Failure expected on an unauthorized user");
+                } catch (IOException ex) {
+                    // expected
+                }
+
+                conn.close();
+                return null;
+            }
+        });
+
+        ugi = UserGroupInformation.createUserForTesting("IT", new String[] 
{"IT"});
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws Exception {
+                Connection conn = ConnectionFactory.createConnection(conf);
+                Table table = conn.getTable(TableName.valueOf("temp3"));
+
+                // Try to write something to the "col1" column of the 
"colfam1" of the "temp3" table as the "IT" group
+                // - this should fail
+                Put put = new Put(Bytes.toBytes("row3"));
+                put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"), 
Bytes.toBytes("val2"));
+                try {
+                    table.put(put);
+                    Assert.fail("Failure expected on an unauthorized user");
+                } catch (IOException ex) {
+                    // expected
+                }
+                conn.close();
+                return null;
+            }
+        });
+
+        // Drop the table
+        conn = ConnectionFactory.createConnection(conf);
+        admin = conn.getAdmin();
+
+        admin.disableTable(TableName.valueOf("temp3"));
+        admin.deleteTable(TableName.valueOf("temp3"));
+
+        conn.close();
+        enableColumnAuthOptimization(false);
+    }
+
+    private static void enableColumnAuthOptimization(boolean enable){
+        RangerAuthorizationCoprocessor authorizationCoprocessor =
+            utility.getHBaseCluster().getMaster().getMasterCoprocessorHost().
+                findCoprocessor(RangerAuthorizationCoprocessor.class);
+        try {
+            authorizationCoprocessor.setColumnAuthOptimizationEnabled(enable);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private AccessControlProtos.GetUserPermissionsRequest 
getNamespaceUserPermissions(String namespace) {
+        AccessControlProtos.GetUserPermissionsRequest.Builder 
builderTablePerms = AccessControlProtos.GetUserPermissionsRequest
+            .newBuilder();
+        builderTablePerms.setNamespaceName(ByteString.copyFromUtf8(namespace));
+        
builderTablePerms.setType(AccessControlProtos.Permission.Type.Namespace);
+        return builderTablePerms.build();
+    }
+
+    private AccessControlProtos.GetUserPermissionsRequest 
getTableUserPermissions(String tableName) {
+        AccessControlProtos.GetUserPermissionsRequest.Builder 
builderTablePerms = AccessControlProtos.GetUserPermissionsRequest
+            .newBuilder();
+        
builderTablePerms.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(tableName)));
+        builderTablePerms.setType(AccessControlProtos.Permission.Type.Table);
+        return builderTablePerms.build();
+    }
+
+    private void getUserPermissions(List<UserPermission> userPermissions, 
AccessControlProtos.GetUserPermissionsRequest requestTablePerms, 
RangerAuthorizationCoprocessor authorizationCoprocessor, RpcController 
rpcController) {
+        authorizationCoprocessor.getUserPermissions(rpcController, 
requestTablePerms,
+            new RpcCallback<AccessControlProtos.GetUserPermissionsResponse>() {
+                @Override
+                public void run(AccessControlProtos.GetUserPermissionsResponse 
message) {
+                    if (message != null) {
+                        for (AccessControlProtos.UserPermission perm : message
+                            .getUserPermissionList()) {
+                            
userPermissions.add(AccessControlUtil.toUserPermission(perm));
+                        }
+                    }
+                }
+            });
+    }
 
     private static int getFreePort() throws IOException {
         ServerSocket serverSocket = new ServerSocket(0);
@@ -1040,4 +1440,5 @@ public class HBaseRangerAuthorizationTest {
         serverSocket.close();
         return port;
     }
-}
+
+}
\ No newline at end of file
diff --git 
a/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilterTest.java
 
b/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilterTest.java
index 6b97c3064..fffc7943d 100644
--- 
a/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilterTest.java
+++ 
b/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilterTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -54,7 +55,7 @@ public class RangerAuthorizationFilterTest {
 
                // auth session
                AuthorizationSession session = createSessionMock();
-               RangerAuthorizationFilter filter = new 
RangerAuthorizationFilter(session, familiesAccessAllowed, familiesAccessDenied, 
familiesAccessIndeterminate, columnsAccessAllowed);
+               RangerAuthorizationFilter filter = new 
RangerAuthorizationFilter(session, familiesAccessAllowed, familiesAccessDenied, 
familiesAccessIndeterminate, columnsAccessAllowed, new HashSet<>());
 
                // evaluate access for various types of cases
                Cell aCell = mock(Cell.class);
diff --git a/hbase-agent/src/test/resources/hbase-policies.json 
b/hbase-agent/src/test/resources/hbase-policies.json
index 61960c0af..8ee73b1e0 100644
--- a/hbase-agent/src/test/resources/hbase-policies.json
+++ b/hbase-agent/src/test/resources/hbase-policies.json
@@ -245,6 +245,101 @@
       "id": 38,
       "isEnabled": true,
       "version": 1
+    },
+    {
+      "service": "cl1_hbase", "name": "TempPolicyForOptimizedColAuth", 
"policyType": 0, "description": "", "isAuditEnabled": true,
+      "resources": {
+        "column-family": {"values": ["colfam1"], "isExcludes": false, 
"isRecursive": false },
+        "column": {"values": ["col1"], "isExcludes": false, "isRecursive": 
false},
+        "table": {"values": ["temp6"], "isExcludes": false, "isRecursive": 
false}
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {"type": "read", "isAllowed": true},
+            {"type": "write", "isAllowed": true},
+            {"type": "execute", "isAllowed": true}
+          ],
+          "users": [], "groups": ["IT"], "conditions": [], "delegateAdmin": 
false
+        }
+      ],
+      "id": 101, "isEnabled": true, "version": 1
+    },
+    {
+      "service": "cl1_hbase", "name": "AllowColFamWhenColDeniedForOptimized", 
"policyType": 0, "description": "", "isAuditEnabled": true,
+      "resources": {
+        "column-family": {"values": ["colfam1"], "isExcludes": false, 
"isRecursive": false},
+        "column": {"values": ["*"], "isExcludes": false, "isRecursive": false},
+        "table": {"values": ["temp7"], "isExcludes": false, "isRecursive": 
false}
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {"type": "read", "isAllowed": true},
+            {"type": "write", "isAllowed": true},
+            {"type": "execute", "isAllowed": true}
+          ],
+          "users": [], "groups": ["IT"], "conditions": [], "delegateAdmin": 
false
+        }
+      ],
+      "id": 102, "isEnabled": true, "version": 1
+    },
+    {
+      "service": "cl1_hbase", "name": 
"DenyColWhenColFamAllowedForOptimizedCheck", "policyType": 0, "description": 
"", "isAuditEnabled": true,
+      "resources": {
+        "column-family": {"values": ["colfam1"], "isExcludes": false, 
"isRecursive": false},
+        "column": {"values": ["col1"], "isExcludes": false, "isRecursive": 
false},
+        "table": {"values": ["temp7"], "isExcludes": false, "isRecursive": 
false}
+      },
+      "denyPolicyItems": [
+        {
+          "accesses": [
+            {"type": "read", "isAllowed": true},
+            {"type": "write", "isAllowed": true},
+            {"type": "execute", "isAllowed": true}
+          ],
+          "users": [], "groups": ["IT"], "conditions": [], "delegateAdmin": 
false
+        }
+      ],
+      "id": 103, "isEnabled": true, "version": 1
+    },
+    {
+      "service": "cl1_hbase", "name": "PolicyForAllowColumnFamilyForScan", 
"policyType": 0, "description": "", "isAuditEnabled": true,
+      "resources": {
+        "column-family": {"values": ["colfam1"], "isExcludes": false, 
"isRecursive": false},
+        "column": {"values": ["*"], "isExcludes": false, "isRecursive": false},
+        "table": {"values": ["temp8"], "isExcludes": false, "isRecursive": 
false}
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {"type": "read", "isAllowed": true},
+            {"type": "write", "isAllowed": true},
+            {"type": "execute", "isAllowed": true}
+          ],
+          "users": [], "groups": ["IT","IT2"], "conditions": [], 
"delegateAdmin": false
+        }
+      ],
+      "id": 104, "isEnabled": true, "version": 1
+    },
+    {
+      "service": "cl1_hbase", "name": "PolicyForDenyColumnForScan", 
"policyType": 0, "description": "", "isAuditEnabled": true,
+      "resources": {
+        "column-family": {"values": ["colfam1"], "isExcludes": false, 
"isRecursive": false},
+        "column": {"values": ["col1"], "isExcludes": false, "isRecursive": 
false},
+        "table": {"values": ["temp8"], "isExcludes": false, "isRecursive": 
false}
+      },
+      "denyPolicyItems": [
+        {
+          "accesses": [
+            {"type": "read", "isAllowed": true},
+            {"type": "write", "isAllowed": true},
+            {"type": "execute", "isAllowed": true}
+          ],
+          "users": [], "groups": ["IT"], "conditions": [], "delegateAdmin": 
false
+        }
+      ],
+      "id": 105, "isEnabled": true, "version": 1
     }
   ],
   "serviceDef": {
@@ -1317,4 +1412,4 @@
     },
     "auditMode": "audit-default"
   }
-}
+}
\ No newline at end of file


Reply via email to