This is an automated email from the ASF dual-hosted git repository.

abhi pushed a commit to branch ranger-5039
in repository https://gitbox.apache.org/repos/asf/ranger.git

commit 788f25ea318440fc6cc366debd3e95a078d19e4d
Author: Abhishek Kumar <[email protected]>
AuthorDate: Wed Dec 18 00:50:23 2024 -0800

    RANGER-5039: checkstyle compliance updates - plugin-kafka module
---
 plugin-kafka/pom.xml                               |   2 +
 .../kafka/authorizer/RangerKafkaAuditHandler.java  |  86 ++--
 .../kafka/authorizer/RangerKafkaAuthorizer.java    | 557 ++++++++++-----------
 .../ranger/services/kafka/RangerServiceKafka.java  | 192 ++++---
 .../services/kafka/client/ServiceKafkaClient.java  | 398 +++++++--------
 .../kafka/client/ServiceKafkaConnectionMgr.java    | 127 +++--
 .../authorizer/KafkaRangerAuthorizerGSSTest.java   | 189 ++++---
 .../KafkaRangerAuthorizerSASLSSLTest.java          | 126 +++--
 .../authorizer/KafkaRangerAuthorizerTest.java      | 145 +++---
 .../authorizer/KafkaRangerTopicCreationTest.java   | 116 +++--
 .../kafka/authorizer/KafkaTestUtils.java           | 103 ++--
 .../kafka/authorizer/RangerAdminClientImpl.java    |  28 +-
 12 files changed, 996 insertions(+), 1073 deletions(-)

diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml
index 5e64e2d28..9e2db449b 100644
--- a/plugin-kafka/pom.xml
+++ b/plugin-kafka/pom.xml
@@ -28,6 +28,8 @@
     <name>KAFKA Security Plugin</name>
     <description>KAFKA Security Plugin</description>
     <properties>
+        <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+        <checkstyle.skip>false</checkstyle.skip>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
     <dependencies>
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
index f16fbd6b6..361a50e22 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-
 package org.apache.ranger.authorization.kafka.authorizer;
 
 import org.apache.ranger.audit.model.AuthzAuditEvent;
@@ -33,80 +32,73 @@ import java.util.Collection;
 public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(RangerKafkaAuditHandler.class);
 
-    private AuthzAuditEvent auditEvent      = null;
+    private AuthzAuditEvent auditEvent;
 
-    public RangerKafkaAuditHandler(){
+    public RangerKafkaAuditHandler() {
     }
 
     @Override
     public void processResult(RangerAccessResult result) {
         // If Cluster Resource Level Topic Creation is not Allowed we don't 
audit.
         // Subsequent call from Kafka for Topic Creation at Topic resource 
Level will be audited.
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("==> RangerKafkaAuditHandler.processResult()");
-        }
+        LOG.trace("==> RangerKafkaAuditHandler.processResult()");
+
         if (!isAuditingNeeded(result)) {
             return;
         }
+
         auditEvent = super.getAuthzEvents(result);
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("<== RangerKafkaAuditHandler.processResult()");
-        }
+
+        LOG.trace("<== RangerKafkaAuditHandler.processResult()");
     }
+
     @Override
     public void processResults(Collection<RangerAccessResult> results) {
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("==> RangerKafkaAuditHandler.processResults(" + results 
+ ")");
-        }
-        if (results!=null){
-            for(RangerAccessResult res: results){
+        LOG.trace("==> RangerKafkaAuditHandler.processResults({})", results);
+
+        if (results != null) {
+            for (RangerAccessResult res : results) {
                 processResult(res);
                 flushAudit();
             }
         }
 
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("<== RangerKafkaAuditHandler.processResults(" + results 
+ ")");
-        }
+        LOG.trace("<== RangerKafkaAuditHandler.processResults({})", results);
     }
 
+    public void flushAudit() {
+        LOG.trace("==> RangerKafkaAuditHandler.flushAudit(AuditEvent: {})", 
auditEvent);
 
-    private boolean isAuditingNeeded(final RangerAccessResult result) {
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("==> RangerKafkaAuditHandler.isAuditingNeeded()");
+        if (auditEvent != null) {
+            super.logAuthzAudit(auditEvent);
         }
-        boolean ret = true;
-        boolean                            isAllowed = result.getIsAllowed();
-        RangerAccessRequest       request = result.getAccessRequest();
-        RangerAccessResourceImpl resource = (RangerAccessResourceImpl) 
request.getResource();
-        String resourceName                      = (String) 
resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER);
+
+        LOG.trace("<== RangerKafkaAuditHandler.flushAudit()");
+    }
+
+    private boolean isAuditingNeeded(final RangerAccessResult result) {
+        LOG.trace("==> RangerKafkaAuditHandler.isAuditingNeeded()");
+
+        boolean                  ret          = true;
+        boolean                  isAllowed    = result.getIsAllowed();
+        RangerAccessRequest      request      = result.getAccessRequest();
+        RangerAccessResourceImpl resource     = (RangerAccessResourceImpl) 
request.getResource();
+        String                   resourceName = (String) 
resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER);
+
         if (resourceName != null) {
             if 
(request.getAccessType().equalsIgnoreCase(RangerKafkaAuthorizer.ACCESS_TYPE_CREATE)
 && !isAllowed) {
                 ret = false;
             }
         }
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("RangerKafkaAuditHandler: isAuditingNeeded()");
-            LOG.trace("request:"+request);
-            LOG.trace("resource:"+resource);
-            LOG.trace("resourceName:"+resourceName);
-            LOG.trace("request.getAccessType():"+request.getAccessType());
-            LOG.trace("isAllowed:"+isAllowed);
-            LOG.trace("ret="+ret);
-            LOG.trace("<== RangerKafkaAuditHandler.isAuditingNeeded() = 
"+ret+" for result="+result);
-        }
-        return ret;
-    }
 
-    public void flushAudit() {
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("==> RangerKafkaAuditHandler.flushAudit(" + "AuditEvent: 
" + auditEvent+")");
-        }
-        if (auditEvent != null) {
-            super.logAuthzAudit(auditEvent);
-        }
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("<== RangerKafkaAuditHandler.flushAudit()");
-        }
+        LOG.trace("RangerKafkaAuditHandler: isAuditingNeeded()");
+        LOG.trace("request: {}", request);
+        LOG.trace("resource: {}", resource);
+        LOG.trace("resourceName: {}", resourceName);
+        LOG.trace("request.getAccessType(): {}", request.getAccessType());
+        LOG.trace("isAllowed: {}", isAllowed);
+        LOG.trace("ret = {}", ret);
+        LOG.trace("<== RangerKafkaAuditHandler.isAuditingNeeded() = {} for 
result = {}", ret, result);
+        return ret;
     }
 }
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 96a36abe9..8b83c9775 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -19,17 +19,6 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.stream.Collectors;
-
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -60,291 +49,301 @@ import org.apache.ranger.plugin.util.RangerPerfTracer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+
 public class RangerKafkaAuthorizer implements Authorizer {
-  private static final Logger logger = 
LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = 
RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
-  public static final String KEY_TOPIC = "topic";
-  public static final String KEY_CLUSTER = "cluster";
-  public static final String KEY_CONSUMER_GROUP = "consumergroup";
-  public static final String KEY_TRANSACTIONALID = "transactionalid";
-  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-  public static final String ACCESS_TYPE_READ = "consume";
-  public static final String ACCESS_TYPE_WRITE = "publish";
-  public static final String ACCESS_TYPE_CREATE = "create";
-  public static final String ACCESS_TYPE_DELETE = "delete";
-  public static final String ACCESS_TYPE_CONFIGURE = "configure";
-  public static final String ACCESS_TYPE_DESCRIBE = "describe";
-  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
-  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
-  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
-
-  private static volatile RangerBasePlugin rangerPlugin = null;
-  RangerKafkaAuditHandler auditHandler = null;
-
-  public RangerKafkaAuthorizer() {
-  }
-
-  private static String mapToRangerAccessType(AclOperation operation) {
-    switch (operation) {
-      case READ:
-        return ACCESS_TYPE_READ;
-      case WRITE:
-        return ACCESS_TYPE_WRITE;
-      case ALTER:
-        return ACCESS_TYPE_CONFIGURE;
-      case DESCRIBE:
-        return ACCESS_TYPE_DESCRIBE;
-      case CLUSTER_ACTION:
-        return ACCESS_TYPE_CLUSTER_ACTION;
-      case CREATE:
-        return ACCESS_TYPE_CREATE;
-      case DELETE:
-        return ACCESS_TYPE_DELETE;
-      case DESCRIBE_CONFIGS:
-        return ACCESS_TYPE_DESCRIBE_CONFIGS;
-      case ALTER_CONFIGS:
-        return ACCESS_TYPE_ALTER_CONFIGS;
-      case IDEMPOTENT_WRITE:
-        return ACCESS_TYPE_IDEMPOTENT_WRITE;
-      case UNKNOWN:
-      case ANY:
-      case ALL:
-      default:
-        return null;
+    private static final Logger logger                      = 
LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+    private static final Logger PERF_KAFKAAUTH_REQUEST_LOG  = 
RangerPerfTracer.getPerfLogger("kafkaauth.request");
+
+    public static final String ACCESS_TYPE_ALTER_CONFIGS    = "alter_configs";
+    public static final String KEY_TOPIC                    = "topic";
+    public static final String KEY_CLUSTER                  = "cluster";
+    public static final String KEY_CONSUMER_GROUP           = "consumergroup";
+    public static final String KEY_TRANSACTIONALID          = 
"transactionalid";
+    public static final String KEY_DELEGATIONTOKEN          = 
"delegationtoken";
+    public static final String ACCESS_TYPE_READ             = "consume";
+    public static final String ACCESS_TYPE_WRITE            = "publish";
+    public static final String ACCESS_TYPE_CREATE           = "create";
+    public static final String ACCESS_TYPE_DELETE           = "delete";
+    public static final String ACCESS_TYPE_CONFIGURE        = "configure";
+    public static final String ACCESS_TYPE_DESCRIBE         = "describe";
+    public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = 
"describe_configs";
+    public static final String ACCESS_TYPE_CLUSTER_ACTION   = "cluster_action";
+    public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = 
"idempotent_write";
+
+    private static volatile RangerBasePlugin rangerPlugin;
+
+    RangerKafkaAuditHandler auditHandler;
+
+    public RangerKafkaAuthorizer() {
+    }
+
+    @Override
+    public void close() {
+        logger.info("close() called on authorizer.");
+        try {
+            if (rangerPlugin != null) {
+                rangerPlugin.cleanup();
+            }
+        } catch (Throwable t) {
+            logger.error("Error closing RangerPlugin.", t);
+        }
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        RangerBasePlugin me = rangerPlugin;
+        if (me == null) {
+            synchronized (RangerKafkaAuthorizer.class) {
+                me = rangerPlugin;
+                if (me == null) {
+                    try {
+                        // Possible to override JAAS configuration which is 
used by Ranger, otherwise
+                        // SASL_PLAINTEXT is used, which force Kafka to use 
'sasl_plaintext.KafkaServer',
+                        // if it's not defined, then it reverts to 
'KafkaServer' configuration.
+                        final Object jaasContext = 
configs.get("ranger.jaas.context");
+                        final String listenerName = (jaasContext instanceof 
String
+                                && StringUtils.isNotEmpty((String) 
jaasContext)) ? (String) jaasContext
+                                : SecurityProtocol.SASL_PLAINTEXT.name();
+                        final String saslMechanism = 
SaslConfigs.GSSAPI_MECHANISM;
+                        JaasContext  context       = 
JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, 
configs);
+                        MiscUtil.setUGIFromJAASConfig(context.name());
+                        UserGroupInformation loginUser = 
MiscUtil.getUGILoginUser();
+                        logger.info("LoginUser = {}", loginUser);
+                    } catch (Throwable t) {
+                        logger.error("Error getting principal.", t);
+                    }
+                    rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+                    logger.info("Calling plugin.init()");
+                    rangerPlugin.init();
+                    auditHandler = new RangerKafkaAuditHandler();
+                    rangerPlugin.setResultProcessor(auditHandler);
+                }
+            }
+        }
+    }
+
+    @Override
+    public Map<Endpoint, ? extends CompletionStage<Void>> 
start(AuthorizerServerInfo serverInfo) {
+        return serverInfo.endpoints().stream()
+                .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> 
CompletableFuture.completedFuture(null), (a, b) -> b));
+    }
+
+    @Override
+    public List<AuthorizationResult> authorize(AuthorizableRequestContext 
requestContext, List<Action> actions) {
+        if (rangerPlugin == null) {
+            MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still 
not initialized");
+            return denyAll(actions);
+        }
+
+        RangerPerfTracer perf = null;
+        if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
+            perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, 
"RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+        }
+        try {
+            return wrappedAuthorization(requestContext, actions);
+        } finally {
+            RangerPerfTracer.log(perf);
+        }
     }
-  }
-
-  private static String mapToResourceType(ResourceType resourceType) {
-    switch (resourceType) {
-      case TOPIC:
-        return KEY_TOPIC;
-      case CLUSTER:
-        return KEY_CLUSTER;
-      case GROUP:
-        return KEY_CONSUMER_GROUP;
-      case TRANSACTIONAL_ID:
-        return KEY_TRANSACTIONALID;
-      case DELEGATION_TOKEN:
-        return KEY_DELEGATIONTOKEN;
-      case ANY:
-      case UNKNOWN:
-      default:
-        return null;
+
+    @Override
+    public List<? extends CompletionStage<AclCreateResult>> 
createAcls(AuthorizableRequestContext requestContext, List<AclBinding> 
aclBindings) {
+        logger.error("createAcls is not supported by Ranger for Kafka");
+
+        return aclBindings.stream()
+                .map(ab -> {
+                    CompletableFuture<AclCreateResult> completableFuture = new 
CompletableFuture<>();
+                    completableFuture.completeExceptionally(new 
UnsupportedOperationException("createAcls is not supported by Ranger for 
Kafka"));
+                    return completableFuture;
+                })
+                .collect(Collectors.toList());
     }
-  }
-
-  private static RangerAccessResourceImpl createRangerAccessResource(String 
resourceTypeKey, String resourceName) {
-    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
-    rangerResource.setValue(resourceTypeKey, resourceName);
-    return rangerResource;
-  }
-
-  private static RangerAccessRequestImpl createRangerAccessRequest(String 
userName,
-                                                                   Set<String> 
userGroups,
-                                                                   String ip,
-                                                                   Date 
eventTime,
-                                                                   String 
resourceTypeKey,
-                                                                   String 
resourceName,
-                                                                   String 
accessType) {
-    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
-    rangerRequest.setResource(createRangerAccessResource(resourceTypeKey, 
resourceName));
-    rangerRequest.setUser(userName);
-    rangerRequest.setUserGroups(userGroups);
-    rangerRequest.setClientIPAddress(ip);
-    rangerRequest.setAccessTime(eventTime);
-    rangerRequest.setAccessType(accessType);
-    rangerRequest.setAction(accessType);
-    rangerRequest.setRequestData(resourceName);
-    return rangerRequest;
-  }
-
-  private static List<AuthorizationResult> denyAll(List<Action> actions) {
-    return actions.stream().map(a -> 
AuthorizationResult.DENIED).collect(Collectors.toList());
-  }
-
-  private static List<AuthorizationResult> mapResults(List<Action> actions, 
Collection<RangerAccessResult> results) {
-    if (CollectionUtils.isEmpty(results)) {
-      logger.error("Ranger Plugin returned null or empty. Returning Denied for 
all");
-      return denyAll(actions);
+
+    @Override
+    public List<? extends CompletionStage<AclDeleteResult>> 
deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> 
aclBindingFilters) {
+        logger.error("deleteAcls is not supported by Ranger for Kafka");
+        return aclBindingFilters.stream()
+                .map(ab -> {
+                    CompletableFuture<AclDeleteResult> completableFuture = new 
CompletableFuture<>();
+                    completableFuture.completeExceptionally(new 
UnsupportedOperationException("deleteAcls is not supported by Ranger for 
Kafka"));
+                    return completableFuture;
+                })
+                .collect(Collectors.toList());
     }
-    return results.stream()
-        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED 
: AuthorizationResult.DENIED)
-        .collect(Collectors.toList());
-  }
-
-  private static String toString(AuthorizableRequestContext requestContext) {
-    return requestContext == null ? null :
-        String.format("AuthorizableRequestContext{principal=%s, 
clientAddress=%s, clientId=%s}",
-            requestContext.principal(), requestContext.clientAddress(), 
requestContext.clientId());
-  }
-
-  @Override
-  public void close() {
-    logger.info("close() called on authorizer.");
-    try {
-      if (rangerPlugin != null) {
-        rangerPlugin.cleanup();
-      }
-    } catch (Throwable t) {
-      logger.error("Error closing RangerPlugin.", t);
+
+    @Override
+    public Iterable<AclBinding> acls(AclBindingFilter filter) {
+        logger.error("(getting) acls is not supported by Ranger for Kafka");
+        throw new UnsupportedOperationException("(getting) acls is not 
supported by Ranger for Kafka");
     }
-  }
-
-  @Override
-  public void configure(Map<String, ?> configs) {
-    RangerBasePlugin me = rangerPlugin;
-    if (me == null) {
-      synchronized (RangerKafkaAuthorizer.class) {
-        me = rangerPlugin;
-        if (me == null) {
-          try {
-            // Possible to override JAAS configuration which is used by 
Ranger, otherwise
-            // SASL_PLAINTEXT is used, which force Kafka to use 
'sasl_plaintext.KafkaServer',
-            // if it's not defined, then it reverts to 'KafkaServer' 
configuration.
-            final Object jaasContext = configs.get("ranger.jaas.context");
-            final String listenerName = (jaasContext instanceof String
-                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) 
jaasContext
-                : SecurityProtocol.SASL_PLAINTEXT.name();
-            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
-            JaasContext context = JaasContext.loadServerContext(new 
ListenerName(listenerName), saslMechanism, configs);
-            MiscUtil.setUGIFromJAASConfig(context.name());
-            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
-            logger.info("LoginUser={}", loginUser);
-          } catch (Throwable t) {
-            logger.error("Error getting principal.", t);
-          }
-          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-          logger.info("Calling plugin.init()");
-          rangerPlugin.init();
-          auditHandler = new RangerKafkaAuditHandler();
-          rangerPlugin.setResultProcessor(auditHandler);
+
+    // TODO: provide a real implementation (RANGER-3809)
+    // Currently we return a dummy implementation because KAFKA-13598 makes 
producers idempotent by default and this causes
+    // a failure in the InitProducerId API call on the broker side because of 
the missing acls() method implementation.
+    // Overriding this with a dummy impl will make Kafka return an 
authorization error instead of an exception if the
+    // IDEMPOTENT_WRITE permission wasn't set on the producer.
+    @Override
+    public AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
+
+        logger.debug("authorizeByResourceType call is not supported by Ranger 
for Kafka yet");
+        return AuthorizationResult.DENIED;
+    }
+
+    private static String mapToRangerAccessType(AclOperation operation) {
+        switch (operation) {
+            case READ:
+                return ACCESS_TYPE_READ;
+            case WRITE:
+                return ACCESS_TYPE_WRITE;
+            case ALTER:
+                return ACCESS_TYPE_CONFIGURE;
+            case DESCRIBE:
+                return ACCESS_TYPE_DESCRIBE;
+            case CLUSTER_ACTION:
+                return ACCESS_TYPE_CLUSTER_ACTION;
+            case CREATE:
+                return ACCESS_TYPE_CREATE;
+            case DELETE:
+                return ACCESS_TYPE_DELETE;
+            case DESCRIBE_CONFIGS:
+                return ACCESS_TYPE_DESCRIBE_CONFIGS;
+            case ALTER_CONFIGS:
+                return ACCESS_TYPE_ALTER_CONFIGS;
+            case IDEMPOTENT_WRITE:
+                return ACCESS_TYPE_IDEMPOTENT_WRITE;
+            case UNKNOWN:
+            case ANY:
+            case ALL:
+            default:
+                return null;
         }
-      }
     }
-  }
-
-  @Override
-  public Map<Endpoint, ? extends CompletionStage<Void>> 
start(AuthorizerServerInfo serverInfo) {
-    return serverInfo.endpoints().stream()
-        .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> 
CompletableFuture.completedFuture(null), (a, b) -> b));
-  }
-
-  @Override
-  public List<AuthorizationResult> authorize(AuthorizableRequestContext 
requestContext, List<Action> actions) {
-    if (rangerPlugin == null) {
-      MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not 
initialized");
-      return denyAll(actions);
+
+    private static String mapToResourceType(ResourceType resourceType) {
+        switch (resourceType) {
+            case TOPIC:
+                return KEY_TOPIC;
+            case CLUSTER:
+                return KEY_CLUSTER;
+            case GROUP:
+                return KEY_CONSUMER_GROUP;
+            case TRANSACTIONAL_ID:
+                return KEY_TRANSACTIONALID;
+            case DELEGATION_TOKEN:
+                return KEY_DELEGATIONTOKEN;
+            case ANY:
+            case UNKNOWN:
+            default:
+                return null;
+        }
     }
 
-    RangerPerfTracer perf = null;
-    if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-      perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, 
"RangerKafkaAuthorizer.authorize(actions=" + actions + ")");
+    private static RangerAccessResourceImpl createRangerAccessResource(String 
resourceTypeKey, String resourceName) {
+        RangerAccessResourceImpl rangerResource = new 
RangerAccessResourceImpl();
+        rangerResource.setValue(resourceTypeKey, resourceName);
+        return rangerResource;
     }
-    try {
-      return wrappedAuthorization(requestContext, actions);
-    } finally {
-      RangerPerfTracer.log(perf);
+
+    private static RangerAccessRequestImpl createRangerAccessRequest(String 
userName,
+            Set<String> userGroups,
+            String ip,
+            Date eventTime,
+            String resourceTypeKey,
+            String resourceName,
+            String accessType) {
+        RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+        rangerRequest.setResource(createRangerAccessResource(resourceTypeKey, 
resourceName));
+        rangerRequest.setUser(userName);
+        rangerRequest.setUserGroups(userGroups);
+        rangerRequest.setClientIPAddress(ip);
+        rangerRequest.setAccessTime(eventTime);
+        rangerRequest.setAccessType(accessType);
+        rangerRequest.setAction(accessType);
+        rangerRequest.setRequestData(resourceName);
+        return rangerRequest;
+    }
+
+    private static List<AuthorizationResult> denyAll(List<Action> actions) {
+        return actions.stream().map(a -> 
AuthorizationResult.DENIED).collect(Collectors.toList());
     }
-  }
 
-  private List<AuthorizationResult> 
wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> 
actions) {
-    if (CollectionUtils.isEmpty(actions)) {
-      return Collections.emptyList();
+    private static List<AuthorizationResult> mapResults(List<Action> actions, 
Collection<RangerAccessResult> results) {
+        if (CollectionUtils.isEmpty(results)) {
+            logger.error("Ranger Plugin returned null or empty. Returning 
Denied for all");
+            return denyAll(actions);
+        }
+        return results.stream()
+                .map(r -> r != null && r.getIsAllowed() ? 
AuthorizationResult.ALLOWED : AuthorizationResult.DENIED)
+                .collect(Collectors.toList());
     }
-    String userName = requestContext.principal() == null ? null : 
requestContext.principal().getName();
-    Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName);
-    String hostAddress = requestContext.clientAddress() == null ? null : 
requestContext.clientAddress().getHostAddress();
-    String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) 
== '/' ? hostAddress.substring(1) : hostAddress;
-    Date eventTime = new Date();
-
-    List<RangerAccessRequest> rangerRequests = new ArrayList<>();
-    for (Action action : actions) {
-      String accessType = mapToRangerAccessType(action.operation());
-      if (accessType == null) {
-        MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type, 
requestContext=" + toString(requestContext) +
-            ", actions=" + actions + ", operation=" + action.operation());
-        return denyAll(actions);
-      }
-      String resourceTypeKey = 
mapToResourceType(action.resourcePattern().resourceType());
-      if (resourceTypeKey == null) {
-        MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type, 
requestContext=" + toString(requestContext) +
-            ", actions=" + actions + ", resourceType=" + 
action.resourcePattern().resourceType());
-        return denyAll(actions);
-      }
-
-      RangerAccessRequestImpl rangerAccessRequest = createRangerAccessRequest(
-          userName,
-          userGroups,
-          ip,
-          eventTime,
-          resourceTypeKey,
-          action.resourcePattern().name(),
-          accessType);
-      rangerRequests.add(rangerAccessRequest);
+
+    private static String toString(AuthorizableRequestContext requestContext) {
+        return requestContext == null ? null :
+                String.format("AuthorizableRequestContext{principal=%s, 
clientAddress=%s, clientId=%s}",
+                        requestContext.principal(), 
requestContext.clientAddress(), requestContext.clientId());
     }
 
-    Collection<RangerAccessResult> results = callRangerPlugin(rangerRequests);
+    private List<AuthorizationResult> 
wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> 
actions) {
+        if (CollectionUtils.isEmpty(actions)) {
+            return Collections.emptyList();
+        }
+        String      userName    = requestContext.principal() == null ? null : 
requestContext.principal().getName();
+        Set<String> userGroups  = MiscUtil.getGroupsForRequestUser(userName);
+        String      hostAddress = requestContext.clientAddress() == null ? 
null : requestContext.clientAddress().getHostAddress();
+        String      ip          = StringUtils.isNotEmpty(hostAddress) && 
hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress;
+        Date        eventTime   = new Date();
+
+        List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+        for (Action action : actions) {
+            String accessType = mapToRangerAccessType(action.operation());
+            if (accessType == null) {
+                MiscUtil.logErrorMessageByInterval(logger, "Unsupported access 
type, requestContext=" + toString(requestContext) + ", actions=" + actions + ", 
operation=" + action.operation());
+                return denyAll(actions);
+            }
+            String resourceTypeKey = 
mapToResourceType(action.resourcePattern().resourceType());
+            if (resourceTypeKey == null) {
+                MiscUtil.logErrorMessageByInterval(logger, "Unsupported 
resource type, requestContext=" + toString(requestContext) + ", actions=" + 
actions + ", resourceType=" + action.resourcePattern().resourceType());
+                return denyAll(actions);
+            }
+
+            RangerAccessRequestImpl rangerAccessRequest = 
createRangerAccessRequest(
+                    userName,
+                    userGroups,
+                    ip,
+                    eventTime,
+                    resourceTypeKey,
+                    action.resourcePattern().name(),
+                    accessType);
+            rangerRequests.add(rangerAccessRequest);
+        }
 
-    List<AuthorizationResult> authorizationResults = mapResults(actions, 
results);
+        Collection<RangerAccessResult> results = 
callRangerPlugin(rangerRequests);
 
-    logger.debug("rangerRequests={}, return={}", rangerRequests, 
authorizationResults);
-    return authorizationResults;
-  }
+        List<AuthorizationResult> authorizationResults = mapResults(actions, 
results);
 
-  private Collection<RangerAccessResult> 
callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
-    try {
-      return rangerPlugin.isAccessAllowed(rangerRequests);
-    } catch (Throwable t) {
-      logger.error("Error while calling isAccessAllowed(). requests={}", 
rangerRequests, t);
-      return null;
-    } finally {
-      auditHandler.flushAudit();
+        logger.debug("rangerRequests={}, return={}", rangerRequests, 
authorizationResults);
+        return authorizationResults;
+    }
+
+    private Collection<RangerAccessResult> 
callRangerPlugin(List<RangerAccessRequest> rangerRequests) {
+        try {
+            return rangerPlugin.isAccessAllowed(rangerRequests);
+        } catch (Throwable t) {
+            logger.error("Error while calling isAccessAllowed(). requests={}", 
rangerRequests, t);
+            return null;
+        } finally {
+            auditHandler.flushAudit();
+        }
     }
-  }
-
-  @Override
-  public List<? extends CompletionStage<AclCreateResult>> 
createAcls(AuthorizableRequestContext requestContext, List<AclBinding> 
aclBindings) {
-    logger.error("createAcls is not supported by Ranger for Kafka");
-
-    return aclBindings.stream()
-        .map(ab -> {
-          CompletableFuture<AclCreateResult> completableFuture = new 
CompletableFuture<>();
-          completableFuture.completeExceptionally(new 
UnsupportedOperationException("createAcls is not supported by Ranger for 
Kafka"));
-          return completableFuture;
-        })
-        .collect(Collectors.toList());
-  }
-
-  @Override
-  public List<? extends CompletionStage<AclDeleteResult>> 
deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> 
aclBindingFilters) {
-    logger.error("deleteAcls is not supported by Ranger for Kafka");
-    return aclBindingFilters.stream()
-        .map(ab -> {
-          CompletableFuture<AclDeleteResult> completableFuture = new 
CompletableFuture<>();
-          completableFuture.completeExceptionally(new 
UnsupportedOperationException("deleteAcls is not supported by Ranger for 
Kafka"));
-          return completableFuture;
-        })
-        .collect(Collectors.toList());
-  }
-
-  // TODO: provide a real implementation (RANGER-3809)
-  // Currently we return a dummy implementation because KAFKA-13598 makes 
producers idempotent by default and this causes
-  // a failure in the InitProducerId API call on the broker side because of 
the missing acls() method implementation.
-  // Overriding this with a dummy impl will make Kafka return an authorization 
error instead of an exception if the
-  // IDEMPOTENT_WRITE permission wasn't set on the producer.
-  @Override
-  public AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
-    SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
-
-    logger.debug("authorizeByResourceType call is not supported by Ranger for 
Kafka yet");
-    return AuthorizationResult.DENIED;
-  }
-
-  @Override
-  public Iterable<AclBinding> acls(AclBindingFilter filter) {
-    logger.error("(getting) acls is not supported by Ranger for Kafka");
-    throw new UnsupportedOperationException("(getting) acls is not supported 
by Ranger for Kafka");
-  }
 }
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
index 537333a3e..dc872f4d1 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
@@ -19,11 +19,6 @@
 
 package org.apache.ranger.services.kafka;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyItem;
@@ -37,107 +32,94 @@ import 
org.apache.ranger.services.kafka.client.ServiceKafkaConnectionMgr;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import static 
org.apache.ranger.plugin.policyengine.RangerPolicyEngine.GROUP_PUBLIC;
 
 public class RangerServiceKafka extends RangerBaseService {
-       private static final Logger LOG = 
LoggerFactory.getLogger(RangerServiceKafka.class);
-       public static final String ACCESS_TYPE_DESCRIBE = "describe";
-
-       public RangerServiceKafka() {
-               super();
-       }
-
-       @Override
-       public void init(RangerServiceDef serviceDef, RangerService service) {
-               super.init(serviceDef, service);
-       }
-
-       @Override
-       public Map<String, Object> validateConfig() throws Exception {
-               Map<String, Object> ret = new HashMap<String, Object>();
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerServiceKafka.validateConfig(" + 
serviceName + ")");
-               }
-
-               if (configs != null) {
-                       try {
-                               ret = 
ServiceKafkaConnectionMgr.connectionTest(serviceName, configs);
-                       } catch (Exception e) {
-                               LOG.error("<== 
RangerServiceKafka.validateConfig Error:" + e);
-                               throw e;
-                       }
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerServiceKafka.validateConfig(" + 
serviceName + "): ret=" + ret);
-               }
-
-               return ret;
-       }
-
-       @Override
-       public List<String> lookupResource(ResourceLookupContext context) 
throws Exception {
-               List<String> ret = null;
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerServiceKafka.lookupResource(" + 
serviceName + ")");
-               }
-
-               if (configs != null) {
-                       ServiceKafkaClient serviceKafkaClient = 
ServiceKafkaConnectionMgr.getKafkaClient(serviceName, configs);
-
-                       ret = serviceKafkaClient.getResources(context);
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerServiceKafka.lookupResource(" + 
serviceName + "): ret=" + ret);
-               }
-
-               return ret;
-       }
-
-       @Override
-       public List<RangerPolicy> getDefaultRangerPolicies() throws Exception {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> 
RangerServiceKafka.getDefaultRangerPolicies() ");
-               }
-
-               List<RangerPolicy> ret = super.getDefaultRangerPolicies();
-
-               String authType = getConfig().get(RANGER_AUTH_TYPE,"simple");
-
-               if (StringUtils.equalsIgnoreCase(authType, KERBEROS_TYPE)) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Auth type is " + KERBEROS_TYPE);
-                       }
-               } else {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Auth type is " + authType);
-                       }
-                       for (RangerPolicy defaultPolicy : ret) {
-                               if(defaultPolicy.getName().contains("all")){
-                                       for (RangerPolicy.RangerPolicyItem 
defaultPolicyItem : defaultPolicy.getPolicyItems()) {
-                                               
defaultPolicyItem.addGroup(GROUP_PUBLIC);
-                                       }
-                               }
-                       }
-               }
-               for (RangerPolicy defaultPolicy : ret) {
-                       if (defaultPolicy.getName().contains("all") && 
StringUtils.isNotBlank(lookUpUser)) {
-                               RangerPolicyItem policyItemForLookupUser = new 
RangerPolicyItem();
-                               
policyItemForLookupUser.setUsers(Collections.singletonList(lookUpUser));
-                               
policyItemForLookupUser.setAccesses(Collections.singletonList(
-                                               new 
RangerPolicyItemAccess(ACCESS_TYPE_DESCRIBE)));
-                               policyItemForLookupUser.setDelegateAdmin(false);
-                               
defaultPolicy.addPolicyItem(policyItemForLookupUser);
-                       }
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
RangerServiceKafka.getDefaultRangerPolicies() ");
-               }
-               return ret;
-       }
+    private static final Logger LOG                  = 
LoggerFactory.getLogger(RangerServiceKafka.class);
+    public static final  String ACCESS_TYPE_DESCRIBE = "describe";
+
+    public RangerServiceKafka() {
+        super();
+    }
+
+    @Override
+    public void init(RangerServiceDef serviceDef, RangerService service) {
+        super.init(serviceDef, service);
+    }
+
+    @Override
+    public Map<String, Object> validateConfig() throws Exception {
+        Map<String, Object> ret = new HashMap<>();
+
+        LOG.debug("==> RangerServiceKafka.validateConfig({})", serviceName);
+
+        if (configs != null) {
+            try {
+                ret = ServiceKafkaConnectionMgr.connectionTest(serviceName, 
configs);
+            } catch (Exception e) {
+                LOG.error("<== RangerServiceKafka.validateConfig Error:{}", 
String.valueOf(e));
+                throw e;
+            }
+        }
+
+        LOG.debug("<== RangerServiceKafka.validateConfig({}): ret={}", 
serviceName, ret);
+
+        return ret;
+    }
+
+    @Override
+    public List<String> lookupResource(ResourceLookupContext context) throws 
Exception {
+        List<String> ret = null;
+
+        LOG.debug("==> RangerServiceKafka.lookupResource({})", serviceName);
+
+        if (configs != null) {
+            ServiceKafkaClient serviceKafkaClient = 
ServiceKafkaConnectionMgr.getKafkaClient(serviceName, configs);
+
+            ret = serviceKafkaClient.getResources(context);
+        }
+
+        LOG.debug("<== RangerServiceKafka.lookupResource({}): ret={}", 
serviceName, ret);
+
+        return ret;
+    }
+
+    @Override
+    public List<RangerPolicy> getDefaultRangerPolicies() throws Exception {
+        LOG.debug("==> RangerServiceKafka.getDefaultRangerPolicies() ");
+
+        List<RangerPolicy> ret = super.getDefaultRangerPolicies();
+        String authType = getConfig().get(RANGER_AUTH_TYPE, "simple");
+
+        if (StringUtils.equalsIgnoreCase(authType, KERBEROS_TYPE)) {
+            LOG.debug("Auth type is " + KERBEROS_TYPE);
+        } else {
+            LOG.debug("Auth type is {}", authType);
+            for (RangerPolicy defaultPolicy : ret) {
+                if (defaultPolicy.getName().contains("all")) {
+                    for (RangerPolicy.RangerPolicyItem defaultPolicyItem : 
defaultPolicy.getPolicyItems()) {
+                        defaultPolicyItem.addGroup(GROUP_PUBLIC);
+                    }
+                }
+            }
+        }
+
+        for (RangerPolicy defaultPolicy : ret) {
+            if (defaultPolicy.getName().contains("all") && 
StringUtils.isNotBlank(lookUpUser)) {
+                RangerPolicyItem policyItemForLookupUser = new 
RangerPolicyItem();
+                
policyItemForLookupUser.setUsers(Collections.singletonList(lookUpUser));
+                
policyItemForLookupUser.setAccesses(Collections.singletonList(new 
RangerPolicyItemAccess(ACCESS_TYPE_DESCRIBE)));
+                policyItemForLookupUser.setDelegateAdmin(false);
+                defaultPolicy.addPolicyItem(policyItemForLookupUser);
+            }
+        }
+
+        LOG.debug("<== RangerServiceKafka.getDefaultRangerPolicies() ");
+        return ret;
+    }
 }
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 82e869511..5eb0b872b 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -19,228 +19,198 @@
 
 package org.apache.ranger.services.kafka.client;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.ranger.plugin.client.BaseClient;
 import org.apache.ranger.plugin.service.ResourceLookupContext;
 import org.apache.ranger.plugin.util.TimedEventUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ServiceKafkaClient {
-       private static final Logger LOG = 
LoggerFactory.getLogger(ServiceKafkaClient.class);
-
-       enum RESOURCE_TYPE {
-               TOPIC
-       }
-
-       String serviceName;
-       Map<String,String > configs;
-       private static final String errMessage = " You can still save the 
repository and start creating "
-                       + "policies, but you would not be able to use 
autocomplete for "
-                       + "resource names. Check server logs for more info.";
-
-       private static final String TOPIC_KEY                           = 
"topic";
-       private static final long   LOOKUP_TIMEOUT_SEC          = 5;
-       private static final String KEY_SASL_MECHANISM          = 
"sasl.mechanism";
-       private static final String KEY_SASL_JAAS_CONFIG        = 
"sasl.jaas.config";
-       private static final String KEY_KAFKA_KEYTAB            = 
"kafka.keytab";
-       private static final String KEY_KAFKA_PRINCIPAL         = 
"kafka.principal";
-       private static final String JAAS_KRB5_MODULE            = 
"com.sun.security.auth.module.Krb5LoginModule required";
-       private static final String JAAS_USE_KEYTAB                     = 
"useKeyTab=true";
-       private static final String JAAS_KEYTAB                         = 
"keyTab=\"";
-       private static final String JAAS_STOKE_KEY                      = 
"storeKey=true";
-       private static final String JAAS_SERVICE_NAME           = 
"serviceName=kafka";
-       private static final String JAAS_USER_TICKET_CACHE      = 
"useTicketCache=false";
-       private static final String JAAS_PRINCIPAL                      = 
"principal=\"";
-
-       public ServiceKafkaClient(String serviceName, Map<String,String> 
configs) {
-               this.serviceName = serviceName;
-               this.configs = configs;
-       }
-
-       public Map<String, Object> connectionTest() {
-               String errMsg = errMessage;
-               Map<String, Object> responseData = new HashMap<String, 
Object>();
-               try {
-                       getTopicList(null);
-                       // If it doesn't throw exception, then assume the 
instance is
-                       // reachable
-                       String successMsg = "ConnectionTest Successful";
-                       BaseClient.generateResponseDataMap(true, successMsg,
-                                       successMsg, null, null, responseData);
-               } catch (Exception e) {
-                       LOG.error("Error connecting to Kafka. kafkaClient=" + 
this, e);
-                       String failureMsg = "Unable to connect to Kafka 
instance."
-                                       + e.getMessage();
-                       BaseClient.generateResponseDataMap(false, failureMsg,
-                                       failureMsg + errMsg, null, null, 
responseData);
-               }
-               return responseData;
-       }
-
-       private List<String> getTopicList(List<String> ignoreTopicList) throws 
Exception {
-               List<String> ret = new ArrayList<String>();
-
-               int sessionTimeout = 5000;
-               int connectionTimeout = 10000;
-               AdminClient adminClient = null;
-
-               try {
-                       Properties props = new Properties();
-                       props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
-                       props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, 
configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG));
-                       props.put(KEY_SASL_MECHANISM, 
configs.get(KEY_SASL_MECHANISM));
-                       props.put(KEY_SASL_JAAS_CONFIG, getJAASConfig(configs));
-                       props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
getIntProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, sessionTimeout));
-                       
props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 
getIntProperty(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 
connectionTimeout));
-                       adminClient = KafkaAdminClient.create(props);
-                       ListTopicsResult listTopicsResult = 
adminClient.listTopics();
-                       if (listTopicsResult != null) {
-                               Collection<TopicListing> topicListings = 
listTopicsResult.listings().get();
-                               for (TopicListing topicListing : topicListings) 
{
-                                       String topicName = topicListing.name();
-                                       if (ignoreTopicList == null || 
!ignoreTopicList.contains(topicName)) {
-                                               ret.add(topicName);
-                                       }
-                               }
-                       }
-               } catch (Exception e) {
-                       throw e;
-               } finally {
-                       if (adminClient != null) {
-                               adminClient.close();
-                       }
-               }
-               return ret;
-       }
-
-
-
-       /**
-        * @param context
-        * @param context
-        * @return
-        */
-       public List<String> getResources(ResourceLookupContext context) {
-
-               String userInput = context.getUserInput();
-               String resource = context.getResourceName();
-               Map<String, List<String>> resourceMap = context.getResources();
-               List<String> resultList = null;
-               List<String> topicList = null;
-
-               RESOURCE_TYPE lookupResource = RESOURCE_TYPE.TOPIC;
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== getResources()  UserInput: \"" + 
userInput
-                                       + "\" resource : " + resource + " 
resourceMap: "
-                                       + resourceMap);
-               }
-
-               if (userInput != null && resource != null) {
-                       if (resourceMap != null && !resourceMap.isEmpty()) {
-                               topicList = resourceMap.get(TOPIC_KEY);
-                       }
-                       switch (resource.trim().toLowerCase()) {
-                               case TOPIC_KEY:
-                                       lookupResource = RESOURCE_TYPE.TOPIC;
-                                       break;
-                               default:
-                                       break;
-                       }
-               }
-
-               if (userInput != null) {
-                       try {
-                               Callable<List<String>> callableObj = null;
-                               final String userInputFinal = userInput;
-
-                               final List<String> finalTopicList = topicList;
-
-                               if (lookupResource == RESOURCE_TYPE.TOPIC) {
-                                       // get the topic list for given Input
-                                       callableObj = new 
Callable<List<String>>() {
-                                               @Override
-                                               public List<String> call() {
-                                                       List<String> retList = 
new ArrayList<String>();
-                                                       try {
-                                                               List<String> 
list = getTopicList(finalTopicList);
-                                                               if 
(userInputFinal != null
-                                                                               
&& !userInputFinal.isEmpty()) {
-                                                                       for 
(String value : list) {
-                                                                               
if (value.startsWith(userInputFinal)) {
-                                                                               
        retList.add(value);
-                                                                               
}
-                                                                       }
-                                                               } else {
-                                                                       
retList.addAll(list);
-                                                               }
-                                                       } catch (Exception ex) {
-                                                               
LOG.error("Error getting topic.", ex);
-                                                       }
-                                                       return retList;
-                                               };
-                                       };
-                               }
-                               // If we need to do lookup
-                               if (callableObj != null) {
-                                       synchronized (this) {
-                                               resultList = 
TimedEventUtil.timedTask(callableObj,
-                                                               
LOOKUP_TIMEOUT_SEC, TimeUnit.SECONDS);
-                                       }
-                               }
-                       } catch (Exception e) {
-                               LOG.error("Unable to get hive resources.", e);
-                       }
-               }
-
-               return resultList;
-       }
-
-       @Override
-       public String toString() {
-               return "ServiceKafkaClient [serviceName=" + serviceName
-                               + ", configs=" + configs + "]";
-       }
-
-       private Integer getIntProperty(String key, int defaultValue) {
-               if (key == null) {
-                       return defaultValue;
-               }
-               String rtrnVal = configs.get(key);
-               if (rtrnVal == null) {
-                       return defaultValue;
-               }
-               return Integer.valueOf(rtrnVal);
-       }
-
-       private String getJAASConfig(Map<String,String> configs){
-               String jaasConfig =  new StringBuilder()
-                               .append(JAAS_KRB5_MODULE).append(" ")
-                               .append(JAAS_USE_KEYTAB).append(" ")
-                               
.append(JAAS_KEYTAB).append(configs.get(KEY_KAFKA_KEYTAB)).append("\"").append("
 ")
-                               .append(JAAS_STOKE_KEY).append(" ")
-                               .append(JAAS_USER_TICKET_CACHE).append(" ")
-                               .append(JAAS_SERVICE_NAME).append(" ")
-                               
.append(JAAS_PRINCIPAL).append(configs.get(KEY_KAFKA_PRINCIPAL)).append("\";")
-                               .toString();
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("KafkaClient JAAS: " + jaasConfig);
-               }
-               return jaasConfig;
-       }
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
+public class ServiceKafkaClient {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ServiceKafkaClient.class);
+
+    private static final String errMessage             = " You can still save 
the repository and start creating policies, but you would not be able to use 
autocomplete for resource names. Check server logs for more info.";
+    private static final String TOPIC_KEY              = "topic";
+    private static final String KEY_SASL_MECHANISM     = "sasl.mechanism";
+    private static final String KEY_SASL_JAAS_CONFIG   = "sasl.jaas.config";
+    private static final String KEY_KAFKA_KEYTAB       = "kafka.keytab";
+    private static final String KEY_KAFKA_PRINCIPAL    = "kafka.principal";
+    private static final String JAAS_KRB5_MODULE       = 
"com.sun.security.auth.module.Krb5LoginModule required";
+    private static final String JAAS_USE_KEYTAB        = "useKeyTab=true";
+    private static final String JAAS_KEYTAB            = "keyTab=\"";
+    private static final String JAAS_STOKE_KEY         = "storeKey=true";
+    private static final String JAAS_SERVICE_NAME      = "serviceName=kafka";
+    private static final String JAAS_USER_TICKET_CACHE = 
"useTicketCache=false";
+    private static final String JAAS_PRINCIPAL         = "principal=\"";
+    private static final long   LOOKUP_TIMEOUT_SEC     = 5;
+
+    String              serviceName;
+    Map<String, String> configs;
+
+    public ServiceKafkaClient(String serviceName, Map<String, String> configs) 
{
+        this.serviceName = serviceName;
+        this.configs     = configs;
+    }
+
+    public Map<String, Object> connectionTest() {
+        String              errMsg       = errMessage;
+        Map<String, Object> responseData = new HashMap<>();
+        try {
+            getTopicList(null);
+            // If it doesn't throw exception, then assume the instance is 
reachable
+            String successMsg = "ConnectionTest Successful";
+            BaseClient.generateResponseDataMap(true, successMsg, successMsg, 
null, null, responseData);
+        } catch (Exception e) {
+            LOG.error("Error connecting to Kafka. kafkaClient = {}", this, e);
+            String failureMsg = "Unable to connect to Kafka instance." + 
e.getMessage();
+            BaseClient.generateResponseDataMap(false, failureMsg, failureMsg + 
errMsg, null, null, responseData);
+        }
+        return responseData;
+    }
+
+    public List<String> getResources(ResourceLookupContext context) {
+        String                    userInput   = context.getUserInput();
+        String                    resource    = context.getResourceName();
+        Map<String, List<String>> resourceMap = context.getResources();
+        List<String>              resultList  = null;
+        List<String>              topicList   = null;
+
+        ResourceType lookupResource = ResourceType.TOPIC;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== getResources()  UserInput: \"{}\" resource : {} 
resourceMap: {}", userInput, resource, resourceMap);
+        }
+
+        if (userInput != null && resource != null) {
+            if (resourceMap != null && !resourceMap.isEmpty()) {
+                topicList = resourceMap.get(TOPIC_KEY);
+            }
+            if (resource.trim().equalsIgnoreCase(TOPIC_KEY)) {
+                lookupResource = ResourceType.TOPIC;
+            }
+        }
+
+        if (userInput != null) {
+            try {
+                Callable<List<String>> callableObj    = null;
+                final String           userInputFinal = userInput;
+
+                final List<String> finalTopicList = topicList;
+
+                if (lookupResource == ResourceType.TOPIC) {
+                    // get the topic list for given Input
+                    callableObj = () -> {
+                        List<String> retList = new ArrayList<>();
+                        try {
+                            List<String> list = getTopicList(finalTopicList);
+                            if (userInputFinal != null && 
!userInputFinal.isEmpty()) {
+                                for (String value : list) {
+                                    if (value.startsWith(userInputFinal)) {
+                                        retList.add(value);
+                                    }
+                                }
+                            } else {
+                                retList.addAll(list);
+                            }
+                        } catch (Exception ex) {
+                            LOG.error("Error getting topic.", ex);
+                        }
+                        return retList;
+                    };
+                }
+                // If we need to do lookup
+                if (callableObj != null) {
+                    synchronized (this) {
+                        resultList = TimedEventUtil.timedTask(callableObj, 
LOOKUP_TIMEOUT_SEC, TimeUnit.SECONDS);
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error("Unable to get hive resources.", e);
+            }
+        }
+
+        return resultList;
+    }
+
+    @Override
+    public String toString() {
+        return "ServiceKafkaClient [serviceName = " + serviceName + ", configs 
= " + configs + "]";
+    }
+
+    private List<String> getTopicList(List<String> ignoreTopicList) throws 
Exception {
+        List<String> ret = new ArrayList<>();
+
+        int         sessionTimeout    = 5000;
+        int         connectionTimeout = 10000;
+        AdminClient adminClient       = null;
+
+        try {
+            Properties props = new Properties();
+            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+            props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, 
configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG));
+            props.put(KEY_SASL_MECHANISM, configs.get(KEY_SASL_MECHANISM));
+            props.put(KEY_SASL_JAAS_CONFIG, getJAASConfig(configs));
+            props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
getIntProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, sessionTimeout));
+            props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 
getIntProperty(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 
connectionTimeout));
+            adminClient = KafkaAdminClient.create(props);
+            ListTopicsResult listTopicsResult = adminClient.listTopics();
+            if (listTopicsResult != null) {
+                Collection<TopicListing> topicListings = 
listTopicsResult.listings().get();
+                for (TopicListing topicListing : topicListings) {
+                    String topicName = topicListing.name();
+                    if (ignoreTopicList == null || 
!ignoreTopicList.contains(topicName)) {
+                        ret.add(topicName);
+                    }
+                }
+            }
+        } finally {
+            if (adminClient != null) {
+                adminClient.close();
+            }
+        }
+        return ret;
+    }
+
+    private Integer getIntProperty(String key, int defaultValue) {
+        if (key == null) {
+            return defaultValue;
+        }
+        String returnVal = configs.get(key);
+        if (returnVal == null) {
+            return defaultValue;
+        }
+        return Integer.valueOf(returnVal);
+    }
+
+    private String getJAASConfig(Map<String, String> configs) {
+        String jaasConfig = new StringBuilder()
+                .append(JAAS_KRB5_MODULE).append(" ")
+                .append(JAAS_USE_KEYTAB).append(" ")
+                
.append(JAAS_KEYTAB).append(configs.get(KEY_KAFKA_KEYTAB)).append("\"").append("
 ")
+                .append(JAAS_STOKE_KEY).append(" ")
+                .append(JAAS_USER_TICKET_CACHE).append(" ")
+                .append(JAAS_SERVICE_NAME).append(" ")
+                
.append(JAAS_PRINCIPAL).append(configs.get(KEY_KAFKA_PRINCIPAL)).append("\";")
+                .toString();
+
+        LOG.debug("KafkaClient JAAS: {}", jaasConfig);
+        return jaasConfig;
+    }
+
+    enum ResourceType {
+        TOPIC
+    }
 }
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
index 60c55cc13..0e6dd073c 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java
@@ -21,81 +21,78 @@ package org.apache.ranger.services.kafka.client;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+
 import java.util.Map;
 
 public class ServiceKafkaConnectionMgr {
-       private static final String SEPARATOR                   = ",";
-       private static final String KEY_SASL_MECHANISM  = "sasl.mechanism";
-       private static final String KEY_KAFKA_KEYTAB    = "kafka.keytab";
-       private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal";
+    private static final String SEPARATOR           = ",";
+    private static final String KEY_SASL_MECHANISM  = "sasl.mechanism";
+    private static final String KEY_KAFKA_KEYTAB    = "kafka.keytab";
+    private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal";
+
+    private ServiceKafkaConnectionMgr() {
+        // to block instantiation
+    }
+
+    public static ServiceKafkaClient getKafkaClient(String serviceName, 
Map<String, String> configs) throws Exception {
+        String error = getServiceConfigValidationErrors(configs);
 
-       static public ServiceKafkaClient getKafkaClient(String serviceName,
-                                                                               
                        Map<String, String> configs) throws Exception {
-               String error = getServiceConfigValidationErrors(configs);
-               if (StringUtils.isNotBlank(error)){
-                       error =  "JAAS configuration missing or not correct in 
Ranger Kafka Service..." + error;
-                       throw new Exception(error);
-               }
-               ServiceKafkaClient serviceKafkaClient = new 
ServiceKafkaClient(serviceName, configs);
-               return serviceKafkaClient;
-       }
+        if (StringUtils.isNotBlank(error)) {
+            error = "JAAS configuration missing or not correct in Ranger Kafka 
Service. " + error;
+            throw new Exception(error);
+        }
+        return new ServiceKafkaClient(serviceName, configs);
+    }
 
-       /**
-        * @param serviceName
-        * @param configs
-        * @return
-        */
-       public static Map<String, Object> connectionTest(String serviceName,
-                       Map<String, String> configs) throws Exception {
-               ServiceKafkaClient serviceKafkaClient = 
getKafkaClient(serviceName,
-                               configs);
-               return serviceKafkaClient.connectionTest();
-       }
+    public static Map<String, Object> connectionTest(String serviceName, 
Map<String, String> configs) throws Exception {
+        ServiceKafkaClient serviceKafkaClient = getKafkaClient(serviceName, 
configs);
+        return serviceKafkaClient.connectionTest();
+    }
 
-       private static String  getServiceConfigValidationErrors(Map<String, 
String> configs) {
-               StringBuilder ret = new StringBuilder();
+    private static String getServiceConfigValidationErrors(Map<String, String> 
configs) {
+        StringBuilder ret = new StringBuilder();
 
-               String bootstrap_servers = 
configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
-               String security_protocol = 
configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
-               String sasl_mechanism = configs.get(KEY_SASL_MECHANISM);
-               String kafka_keytab = configs.get(KEY_KAFKA_KEYTAB);
-               String kafka_principal = configs.get(KEY_KAFKA_PRINCIPAL);
+        String bootstrapServers = 
configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
+        String securityProtocol = 
configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+        String saslMechanism    = configs.get(KEY_SASL_MECHANISM);
+        String kafkaKeytab      = configs.get(KEY_KAFKA_KEYTAB);
+        String kafkaPrincipal   = configs.get(KEY_KAFKA_PRINCIPAL);
 
-               if (StringUtils.isEmpty(bootstrap_servers)) {
-                       ret.append(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
-               }
+        if (StringUtils.isEmpty(bootstrapServers)) {
+            ret.append(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
+        }
 
-               if (StringUtils.isEmpty(security_protocol)) {
-                       if (StringUtils.isNotBlank(ret.toString())) {
-                               
ret.append(SEPARATOR).append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
-                       } else {
-                               
ret.append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
-                       }
-               }
+        if (StringUtils.isEmpty(securityProtocol)) {
+            if (StringUtils.isNotBlank(ret.toString())) {
+                
ret.append(SEPARATOR).append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+            } else {
+                ret.append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
+            }
+        }
 
-               if (StringUtils.isEmpty(sasl_mechanism)) {
-                       if (StringUtils.isNotBlank(ret.toString())) {
-                               
ret.append(SEPARATOR).append(KEY_SASL_MECHANISM);
-                       } else {
-                               ret.append(KEY_SASL_MECHANISM);
-                       }
-               }
+        if (StringUtils.isEmpty(saslMechanism)) {
+            if (StringUtils.isNotBlank(ret.toString())) {
+                ret.append(SEPARATOR).append(KEY_SASL_MECHANISM);
+            } else {
+                ret.append(KEY_SASL_MECHANISM);
+            }
+        }
 
-               if (StringUtils.isEmpty(kafka_keytab)) {
-                       if (StringUtils.isNotBlank(ret.toString())) {
-                               ret.append(SEPARATOR).append(KEY_KAFKA_KEYTAB);
-                       } else {
-                               ret.append(KEY_KAFKA_KEYTAB);
-                       }
-               }
+        if (StringUtils.isEmpty(kafkaKeytab)) {
+            if (StringUtils.isNotBlank(ret.toString())) {
+                ret.append(SEPARATOR).append(KEY_KAFKA_KEYTAB);
+            } else {
+                ret.append(KEY_KAFKA_KEYTAB);
+            }
+        }
 
-               if (StringUtils.isEmpty(kafka_principal)) {
-                       if (StringUtils.isNotBlank(ret.toString())) {
-                               
ret.append(SEPARATOR).append(KEY_KAFKA_PRINCIPAL);
-                       } else {
-                               ret.append(KEY_KAFKA_PRINCIPAL);
-                       }
-               }
-               return ret.toString();
-       }
+        if (StringUtils.isEmpty(kafkaPrincipal)) {
+            if (StringUtils.isNotBlank(ret.toString())) {
+                ret.append(SEPARATOR).append(KEY_KAFKA_PRINCIPAL);
+            } else {
+                ret.append(KEY_KAFKA_PRINCIPAL);
+            }
+        }
+        return ret.toString();
+    }
 }
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index 4ddf75818..ae467a389 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -17,22 +17,8 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
-import java.io.File;
-import java.net.ServerSocket;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
+import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingServer;
@@ -49,37 +35,49 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
 import scala.Some;
 
+import java.io.File;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
 /**
  * A simple test that starts a Kafka broker, creates "test" and "dev" topics,
  * sends a message to them and consumes it.
  * The RangerKafkaAuthorizer enforces the following authorization rules:
- *
- *  - The "IT" group can do anything
- *  - The "public" group can "read/describe/write" on the "test" topic.
- *
+ * <p>
+ * - The "IT" group can do anything
+ * - The "public" group can "read/describe/write" on the "test" topic.
+ * <p>
  * Policies available from admin via:
- *
+ * <p>
  * http://localhost:6080/service/plugins/policies/download/cl1_kafka
- *
+ * <p>
  * Authentication is done via Kerberos/GSS.
  */
 public class KafkaRangerAuthorizerGSSTest {
-    private final static Logger LOG = 
LoggerFactory.getLogger(KafkaRangerAuthorizerGSSTest.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaRangerAuthorizerGSSTest.class);
 
-    private static KafkaServer kafkaServer;
-    private static TestingServer zkServer;
-    private static int port;
-    private static Path tempDir;
+    private static KafkaServer     kafkaServer;
+    private static TestingServer   zkServer;
+    private static int             port;
+    private static Path            tempDir;
     private static SimpleKdcServer kerbyServer;
 
     @BeforeAll
@@ -92,7 +90,7 @@ public class KafkaRangerAuthorizerGSSTest {
         configureKerby(basedir);
 
         // JAAS Config file - We need to point to the correct keytab files
-        Path path = FileSystems.getDefault().getPath(basedir, 
"/src/test/resources/kafka_kerberos.jaas");
+        Path   path    = FileSystems.getDefault().getPath(basedir, 
"/src/test/resources/kafka_kerberos.jaas");
         String content = new String(Files.readAllBytes(path), 
StandardCharsets.UTF_8);
         content = content.replaceAll("<basedir>", basedir);
         //content = content.replaceAll("zookeeper/localhost", "zookeeper/" + 
address);
@@ -103,12 +101,12 @@ public class KafkaRangerAuthorizerGSSTest {
         System.setProperty("java.security.auth.login.config", 
path2.toString());
 
         // Set up Zookeeper to require SASL
-        Map<String,Object> zookeeperProperties = new HashMap<>();
+        Map<String, Object> zookeeperProperties = new HashMap<>();
         zookeeperProperties.put("authProvider.1", 
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
         zookeeperProperties.put("requireClientAuthScheme", "sasl");
         zookeeperProperties.put("jaasLoginRenew", "3600000");
 
-        InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 
1,-1, -1, zookeeperProperties, "localhost");
+        InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 
1, -1, -1, zookeeperProperties, "localhost");
 
         zkServer = new TestingServer(instanceSpec, true);
 
@@ -158,39 +156,6 @@ public class KafkaRangerAuthorizerGSSTest {
         KafkaTestUtils.createSomeTopics(adminProps);
     }
 
-    private static void configureKerby(String baseDir) throws Exception {
-
-        //System.setProperty("sun.security.krb5.debug", "true");
-        System.setProperty("java.security.krb5.conf", baseDir + 
"/target/krb5.conf");
-
-        kerbyServer = new SimpleKdcServer();
-
-        kerbyServer.setKdcRealm("kafka.apache.org");
-        kerbyServer.setAllowUdp(false);
-        kerbyServer.setWorkDir(new File(baseDir + "/target"));
-
-        kerbyServer.init();
-
-        // Create principals
-        String zookeeper = "zookeeper/[email protected]";
-        String kafka = "kafka/[email protected]";
-        String client = "[email protected]";
-
-        kerbyServer.createPrincipal(zookeeper, "zookeeper");
-        File keytabFile = new File(baseDir + "/target/zookeeper.keytab");
-        kerbyServer.exportPrincipal(zookeeper, keytabFile);
-
-        kerbyServer.createPrincipal(kafka, "kafka");
-        keytabFile = new File(baseDir + "/target/kafka.keytab");
-        kerbyServer.exportPrincipal(kafka, keytabFile);
-
-        kerbyServer.createPrincipal(client, "client");
-        keytabFile = new File(baseDir + "/target/client.keytab");
-        kerbyServer.exportPrincipal(client, keytabFile);
-
-        kerbyServer.start();
-    }
-
     @AfterAll
     public static void cleanup() throws Exception {
         if (kafkaServer != null) {
@@ -262,38 +227,9 @@ public class KafkaRangerAuthorizerGSSTest {
         }
     }
 
-    private void checkTopicExists(final KafkaConsumer<String, String> 
consumer) {
-        Map<String, List<PartitionInfo>> topics = consumer.listTopics();
-        while (!topics.containsKey("test")) {
-            LOG.warn("Required topic is not available, only {} present", 
topics.keySet());
-            sleep();
-            topics = consumer.listTopics();
-        }
-        LOG.warn("Available topics: {}", topics.keySet());
-    }
-
-    private void sendMessage(final Producer<String, String> producer) {
-        // Send a message
-        try {
-            LOG.info("Send a message to 'test'");
-            producer.send(new ProducerRecord<>("test", "somekey", 
"somevalue"));
-            producer.flush();
-        } catch (RuntimeException e) {
-            LOG.error("Unable to send message to topic 'test' ", e);
-        }
-    }
-
-    private void sleep() {
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException e) {
-            LOG.info("Interrupted sleep, nothing important");
-        }
-    }
-
     // The "public" group can't write to "dev"
     @Test
-    public void testUnauthorizedWrite() throws Exception {
+    public void testUnauthorizedWrite() {
         // Create the Producer
         Properties producerProps = new Properties();
         producerProps.put("bootstrap.servers", "localhost:" + port);
@@ -315,7 +251,6 @@ public class KafkaRangerAuthorizerGSSTest {
         }
     }
 
-
     @Test
     public void testAuthorizedIdempotentWrite() throws Exception {
         // Create the Producer
@@ -336,4 +271,64 @@ public class KafkaRangerAuthorizerGSSTest {
             record.get();
         }
     }
+
+    private static void configureKerby(String baseDir) throws Exception {
+        System.setProperty("java.security.krb5.conf", baseDir + 
"/target/krb5.conf");
+
+        kerbyServer = new SimpleKdcServer();
+
+        kerbyServer.setKdcRealm("kafka.apache.org");
+        kerbyServer.setAllowUdp(false);
+        kerbyServer.setWorkDir(new File(baseDir + "/target"));
+
+        kerbyServer.init();
+
+        // Create principals
+        String zookeeper = "zookeeper/[email protected]";
+        String kafka     = "kafka/[email protected]";
+        String client    = "[email protected]";
+
+        kerbyServer.createPrincipal(zookeeper, "zookeeper");
+        File keytabFile = new File(baseDir + "/target/zookeeper.keytab");
+        kerbyServer.exportPrincipal(zookeeper, keytabFile);
+
+        kerbyServer.createPrincipal(kafka, "kafka");
+        keytabFile = new File(baseDir + "/target/kafka.keytab");
+        kerbyServer.exportPrincipal(kafka, keytabFile);
+
+        kerbyServer.createPrincipal(client, "client");
+        keytabFile = new File(baseDir + "/target/client.keytab");
+        kerbyServer.exportPrincipal(client, keytabFile);
+
+        kerbyServer.start();
+    }
+
+    private void checkTopicExists(final KafkaConsumer<String, String> 
consumer) {
+        Map<String, List<PartitionInfo>> topics = consumer.listTopics();
+        while (!topics.containsKey("test")) {
+            LOG.warn("Required topic is not available, only {} present", 
topics.keySet());
+            sleep();
+            topics = consumer.listTopics();
+        }
+        LOG.warn("Available topics: {}", topics.keySet());
+    }
+
+    private void sendMessage(final Producer<String, String> producer) {
+        // Send a message
+        try {
+            LOG.info("Send a message to 'test'");
+            producer.send(new ProducerRecord<>("test", "somekey", 
"somevalue"));
+            producer.flush();
+        } catch (RuntimeException e) {
+            LOG.error("Unable to send message to topic 'test' ", e);
+        }
+    }
+
+    private void sleep() {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            LOG.info("Interrupted sleep, nothing important");
+        }
+    }
 }
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
index 4c777c7fc..1dacb988c 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java
@@ -17,18 +17,8 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
-import java.io.File;
-import java.io.OutputStream;
-import java.math.BigInteger;
-import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.security.KeyStore;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -48,38 +38,47 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
 import scala.Some;
 
+import java.io.File;
+import java.io.OutputStream;
+import java.math.BigInteger;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
 /**
- * A simple test that starts a Kafka broker, creates "test" and "dev" topics, 
sends a message to them and consumes it. We also plug in a 
+ * A simple test that starts a Kafka broker, creates "test" and "dev" topics, 
sends a message to them and consumes it. We also plug in a
  * CustomAuthorizer that enforces some authorization rules:
- * 
- *  - The "IT" group can do anything
- *  - The "public" group can "read/describe/write" on the "test" topic.
- *  - The "public" group can only "read/describe" on the "dev" topic, but not 
write.
- * 
+ * <p>
+ * - The "IT" group can do anything
+ * - The "public" group can "read/describe/write" on the "test" topic.
+ * - The "public" group can only "read/describe" on the "dev" topic, but not 
write.
+ * <p>
  * Policies available from admin via:
- * 
+ * <p>
  * http://localhost:6080/service/plugins/policies/download/cl1_kafka
- * 
+ * <p>
  * Clients and services authenticate to Kafka using the SASL SSL protocol as 
part of this test.
  */
 @Disabled("Causing JVM to abort on some platforms")
 public class KafkaRangerAuthorizerSASLSSLTest {
-    private static KafkaServer kafkaServer;
+    private static KafkaServer   kafkaServer;
     private static TestingServer zkServer;
-    private static int port;
-    private static String serviceKeystorePath;
-    private static String clientKeystorePath;
-    private static String truststorePath;
-    private static Path tempDir;
+    private static int           port;
+    private static String        serviceKeystorePath;
+    private static String        clientKeystorePath;
+    private static String        truststorePath;
+    private static Path          tempDir;
 
     @BeforeAll
     public static void setup() throws Exception {
-       // JAAS Config file
+        // JAAS Config file
         String basedir = System.getProperty("basedir");
         if (basedir == null) {
             basedir = new File(".").getCanonicalPath();
@@ -87,30 +86,30 @@ public class KafkaRangerAuthorizerSASLSSLTest {
 
         File f = new File(basedir + "/src/test/resources/kafka_plain.jaas");
         System.setProperty("java.security.auth.login.config", f.getPath());
-        
-       // Create keys
-       String serviceDN = "CN=Service,O=Apache,L=Dublin,ST=Leinster,C=IE";
-       String clientDN = "CN=Client,O=Apache,L=Dublin,ST=Leinster,C=IE";
-       
-       // Create a truststore
-       KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
-       keystore.load(null, "security".toCharArray());
-       
-       serviceKeystorePath = 
-                       KafkaTestUtils.createAndStoreKey(serviceDN, serviceDN, 
BigInteger.valueOf(30), 
-                                       "sspass", "myservicekey", "skpass", 
keystore);
-       clientKeystorePath = 
-                       KafkaTestUtils.createAndStoreKey(clientDN, clientDN, 
BigInteger.valueOf(31), 
-                                       "cspass", "myclientkey", "ckpass", 
keystore);
-       
-       File truststoreFile = File.createTempFile("kafkatruststore", ".jks");
-       try (OutputStream output = 
Files.newOutputStream(truststoreFile.toPath())) {
-               keystore.store(output, "security".toCharArray());
-       }
-       truststorePath = truststoreFile.getPath();
-                       
+
+        // Create keys
+        String serviceDN = "CN=Service,O=Apache,L=Dublin,ST=Leinster,C=IE";
+        String clientDN  = "CN=Client,O=Apache,L=Dublin,ST=Leinster,C=IE";
+
+        // Create a truststore
+        KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
+        keystore.load(null, "security".toCharArray());
+
+        serviceKeystorePath =
+                KafkaTestUtils.createAndStoreKey(serviceDN, serviceDN, 
BigInteger.valueOf(30),
+                        "sspass", "myservicekey", "skpass", keystore);
+        clientKeystorePath  =
+                KafkaTestUtils.createAndStoreKey(clientDN, clientDN, 
BigInteger.valueOf(31),
+                        "cspass", "myclientkey", "ckpass", keystore);
+
+        File truststoreFile = File.createTempFile("kafkatruststore", ".jks");
+        try (OutputStream output = 
Files.newOutputStream(truststoreFile.toPath())) {
+            keystore.store(output, "security".toCharArray());
+        }
+        truststorePath = truststoreFile.getPath();
+
         zkServer = new TestingServer();
-        
+
         // Get a random port
         ServerSocket serverSocket = new ServerSocket(0);
         port = serverSocket.getLocalPort();
@@ -144,10 +143,10 @@ public class KafkaRangerAuthorizerSASLSSLTest {
 
         // Plug in Apache Ranger authorizer
         props.put("authorizer.class.name", 
"org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
-        
+
         // Create users for testing
         UserGroupInformation.createUserForTesting("alice", new String[] 
{"IT"});
-        
+
         KafkaConfig config = new KafkaConfig(props);
         kafkaServer = new KafkaServer(config, Time.SYSTEM, new 
Some<String>("KafkaRangerAuthorizerSASLSSLTest"), false);
         kafkaServer.startup();
@@ -165,7 +164,7 @@ public class KafkaRangerAuthorizerSASLSSLTest {
         adminProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
         KafkaTestUtils.createSomeTopics(adminProps);
     }
-    
+
     @AfterAll
     public static void cleanup() throws Exception {
         if (kafkaServer != null) {
@@ -174,24 +173,24 @@ public class KafkaRangerAuthorizerSASLSSLTest {
         if (zkServer != null) {
             zkServer.stop();
         }
-        
+
         File clientKeystoreFile = new File(clientKeystorePath);
         if (clientKeystoreFile.exists()) {
-               FileUtils.forceDelete(clientKeystoreFile);
+            FileUtils.forceDelete(clientKeystoreFile);
         }
         File serviceKeystoreFile = new File(serviceKeystorePath);
         if (serviceKeystoreFile.exists()) {
-               FileUtils.forceDelete(serviceKeystoreFile);
+            FileUtils.forceDelete(serviceKeystoreFile);
         }
         File truststoreFile = new File(truststorePath);
         if (truststoreFile.exists()) {
-               FileUtils.forceDelete(truststoreFile);
+            FileUtils.forceDelete(truststoreFile);
         }
         if (tempDir != null) {
             FileUtils.deleteDirectory(tempDir.toFile());
         }
     }
-    
+
     @Test
     public void testAuthorizedRead() throws Exception {
         // Create the Producer
@@ -202,7 +201,7 @@ public class KafkaRangerAuthorizerSASLSSLTest {
         producerProps.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
         producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_SSL");
         producerProps.put("sasl.mechanism", "PLAIN");
-        
+
         producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
         producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
serviceKeystorePath);
         producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
@@ -256,7 +255,7 @@ public class KafkaRangerAuthorizerSASLSSLTest {
             }
         }
     }
-    
+
     @Test
     public void testAuthorizedWrite() throws Exception {
         // Create the Producer
@@ -284,5 +283,4 @@ public class KafkaRangerAuthorizerSASLSSLTest {
             record.get();
         }
     }
-    
 }
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
index 2ff2c1083..346682851 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java
@@ -17,19 +17,7 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.OutputStream;
-import java.math.BigInteger;
-import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.security.KeyStore;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
+import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
@@ -44,74 +32,84 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.utils.Time;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import kafka.server.KafkaConfig;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import scala.Some;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.math.BigInteger;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
 /**
- * A simple test that starts a Kafka broker, creates "test" and "dev" topics, 
sends a message to them and consumes it. We also plug in a 
+ * A simple test that starts a Kafka broker, creates "test" and "dev" topics, 
sends a message to them and consumes it. We also plug in a
  * CustomAuthorizer that enforces some authorization rules:
- * 
- *  - The "IT" group can do anything
- *  - The "public" group can "read/describe/write" on the "test" topic.
- *  - The "public" group can only "read/describe" on the "dev" topic, but not 
write.
- *
+ * <p>
+ * - The "IT" group can do anything
+ * - The "public" group can "read/describe/write" on the "test" topic.
+ * - The "public" group can only "read/describe" on the "dev" topic, but not 
write.
+ * <p>
  * In addition we have a TAG based policy, which grants "read/describe" access 
to the "public" group to the "messages" topic (which is associated
  * with the tag called "MessagesTag". A "kafka_topic" entity was created in 
Apache Atlas + then associated with the "MessagesTag". This was
  * then imported into Ranger using the TagSyncService. The policies were then 
downloaded locally and saved for testing off-line.
- * 
+ * <p>
  * Policies available from admin via:
- * 
+ * <p>
  * http://localhost:6080/service/plugins/policies/download/cl1_kafka
  */
 public class KafkaRangerAuthorizerTest {
-    
-    private static KafkaServer kafkaServer;
+    private static KafkaServer   kafkaServer;
     private static TestingServer zkServer;
-    private static int port;
-    private static String serviceKeystorePath;
-    private static String clientKeystorePath;
-    private static String truststorePath;
-    private static Path tempDir;
-    
+    private static int           port;
+    private static String        serviceKeystorePath;
+    private static String        clientKeystorePath;
+    private static String        truststorePath;
+    private static Path          tempDir;
+
     @BeforeAll
     public static void setup() throws Exception {
-       // Create keys
+        // Create keys
         String serviceDN = "CN=localhost,O=Apache,L=Dublin,ST=Leinster,C=IE";
-        String clientDN = "CN=localhost,O=Apache,L=Dublin,ST=Leinster,C=IE";
-       
-       // Create a truststore
-       KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
-       keystore.load(null, "security".toCharArray());
-       
-       serviceKeystorePath = 
-                       KafkaTestUtils.createAndStoreKey(serviceDN, serviceDN, 
BigInteger.valueOf(30), 
-                                       "sspass", "myservicekey", "skpass", 
keystore);
-       clientKeystorePath = 
-                       KafkaTestUtils.createAndStoreKey(clientDN, clientDN, 
BigInteger.valueOf(31), 
-                                       "cspass", "myclientkey", "ckpass", 
keystore);
-       
-       File truststoreFile = File.createTempFile("kafkatruststore", ".jks");
-       try (OutputStream output = new FileOutputStream(truststoreFile)) {
-               keystore.store(output, "security".toCharArray());
-       }
-       truststorePath = truststoreFile.getPath();
-                       
+        String clientDN  = "CN=localhost,O=Apache,L=Dublin,ST=Leinster,C=IE";
+
+        // Create a truststore
+        KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
+        keystore.load(null, "security".toCharArray());
+
+        serviceKeystorePath =
+                KafkaTestUtils.createAndStoreKey(serviceDN, serviceDN, 
BigInteger.valueOf(30),
+                        "sspass", "myservicekey", "skpass", keystore);
+        clientKeystorePath  =
+                KafkaTestUtils.createAndStoreKey(clientDN, clientDN, 
BigInteger.valueOf(31),
+                        "cspass", "myclientkey", "ckpass", keystore);
+
+        File truststoreFile = File.createTempFile("kafkatruststore", ".jks");
+        try (OutputStream output = new FileOutputStream(truststoreFile)) {
+            keystore.store(output, "security".toCharArray());
+        }
+        truststorePath = truststoreFile.getPath();
+
         zkServer = new TestingServer();
-               zkServer.start() ;
-        
+        zkServer.start();
+
         // Get a random port
         try (ServerSocket serverSocket = new ServerSocket(0)) {
-                       Assertions.assertNotNull(serverSocket) ;
-                       port = serverSocket.getLocalPort() ;
-                       Assertions.assertTrue(port > 0) ;
-               } catch (java.io.IOException e) {
-                       throw new RuntimeException("Local socket port not 
available", e) ;
-               }
+            Assertions.assertNotNull(serverSocket);
+            port = serverSocket.getLocalPort();
+            Assertions.assertTrue(port > 0);
+        } catch (java.io.IOException e) {
+            throw new RuntimeException("Local socket port not available", e);
+        }
 
         tempDir = Files.createTempDirectory("kafka");
 
@@ -137,11 +135,11 @@ public class KafkaRangerAuthorizerTest {
 
         // Plug in Apache Ranger authorizer
         props.put("authorizer.class.name", 
"org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
-        
+
         // Create users for testing
         UserGroupInformation.createUserForTesting(clientDN, new String[] 
{"public"});
         UserGroupInformation.createUserForTesting(serviceDN, new String[] 
{"IT"});
-        
+
         KafkaConfig config = new KafkaConfig(props);
         kafkaServer = new KafkaServer(config, Time.SYSTEM, new 
Some<String>("KafkaRangerAuthorizerTest"), false);
         kafkaServer.startup();
@@ -158,7 +156,7 @@ public class KafkaRangerAuthorizerTest {
         adminProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
         KafkaTestUtils.createSomeTopics(adminProps);
     }
-    
+
     @AfterAll
     public static void cleanup() throws Exception {
         if (kafkaServer != null) {
@@ -167,24 +165,24 @@ public class KafkaRangerAuthorizerTest {
         if (zkServer != null) {
             zkServer.stop();
         }
-        
+
         File clientKeystoreFile = new File(clientKeystorePath);
         if (clientKeystoreFile.exists()) {
-               FileUtils.forceDelete(clientKeystoreFile);
+            FileUtils.forceDelete(clientKeystoreFile);
         }
         File serviceKeystoreFile = new File(serviceKeystorePath);
         if (serviceKeystoreFile.exists()) {
-               FileUtils.forceDelete(serviceKeystoreFile);
+            FileUtils.forceDelete(serviceKeystoreFile);
         }
         File truststoreFile = new File(truststorePath);
         if (truststoreFile.exists()) {
-               FileUtils.forceDelete(truststoreFile);
+            FileUtils.forceDelete(truststoreFile);
         }
         if (tempDir != null) {
             FileUtils.deleteDirectory(tempDir.toFile());
         }
     }
-    
+
     // The "public" group can read from "test"
     @Test
     public void testAuthorizedRead() throws Exception {
@@ -201,7 +199,7 @@ public class KafkaRangerAuthorizerTest {
         producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
truststorePath);
         producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
"security");
-        
+
         // Create the Consumer
         Properties consumerProps = new Properties();
         consumerProps.put("bootstrap.servers", "localhost:" + port);
@@ -244,7 +242,7 @@ public class KafkaRangerAuthorizerTest {
             Assertions.assertEquals("somevalue", record.value());
         }
     }
-    
+
     // The "IT" group can write to any topic
     @Test
     public void testAuthorizedWrite() throws Exception {
@@ -269,7 +267,7 @@ public class KafkaRangerAuthorizerTest {
             record.get();
         }
     }
-    
+
     // The "public" group can write to "test" but not "dev"
     @Test
     public void testUnauthorizedWrite() throws Exception {
@@ -290,7 +288,7 @@ public class KafkaRangerAuthorizerTest {
         try (Producer<String, String> producer = new 
KafkaProducer<>(producerProps)) {
             // Send a message
             Future<RecordMetadata> record =
-                producer.send(new ProducerRecord<>("test", "somekey", 
"somevalue"));
+                    producer.send(new ProducerRecord<>("test", "somekey", 
"somevalue"));
             producer.flush();
             record.get();
 
@@ -363,5 +361,4 @@ public class KafkaRangerAuthorizerTest {
             Assertions.assertEquals("somevalue", record.value());
         }
     }
-
 }
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
index 6fb8ccc9f..e1420138b 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
@@ -17,17 +17,8 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
-import java.io.File;
-import java.net.ServerSocket;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingServer;
@@ -46,19 +37,26 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
 import scala.Some;
 
+import java.io.File;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
 
 public class KafkaRangerTopicCreationTest {
-    private final static Logger LOG = 
LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class);
 
-    private static KafkaServer kafkaServer;
-    private static TestingServer zkServer;
-    private static int port;
-    private static Path tempDir;
+    private static KafkaServer     kafkaServer;
+    private static TestingServer   zkServer;
+    private static int             port;
+    private static Path            tempDir;
     private static SimpleKdcServer kerbyServer;
 
     @BeforeAll
@@ -72,7 +70,7 @@ public class KafkaRangerTopicCreationTest {
         configureKerby(basedir);
 
         // JAAS Config file - We need to point to the correct keytab files
-        Path path = FileSystems.getDefault().getPath(basedir, 
"/src/test/resources/kafka_kerberos.jaas");
+        Path   path    = FileSystems.getDefault().getPath(basedir, 
"/src/test/resources/kafka_kerberos.jaas");
         String content = new String(Files.readAllBytes(path), 
StandardCharsets.UTF_8);
         content = content.replaceAll("<basedir>", basedir);
         //content = content.replaceAll("zookeeper/localhost", "zookeeper/" + 
address);
@@ -83,12 +81,12 @@ public class KafkaRangerTopicCreationTest {
         System.setProperty("java.security.auth.login.config", 
path2.toString());
 
         // Set up Zookeeper to require SASL
-        Map<String,Object> zookeeperProperties = new HashMap<>();
+        Map<String, Object> zookeeperProperties = new HashMap<>();
         zookeeperProperties.put("authProvider.1", 
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
         zookeeperProperties.put("requireClientAuthScheme", "sasl");
         zookeeperProperties.put("jaasLoginRenew", "3600000");
 
-        InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 
1,-1, -1, zookeeperProperties, "localhost");
+        InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 
1, -1, -1, zookeeperProperties, "localhost");
 
         zkServer = new TestingServer(instanceSpec, true);
 
@@ -129,39 +127,6 @@ public class KafkaRangerTopicCreationTest {
         KafkaConfig config = new KafkaConfig(props);
         kafkaServer = new KafkaServer(config, Time.SYSTEM, new 
Some<String>("KafkaRangerTopicCreationTest"), false);
         kafkaServer.startup();
-   }
-
-    private static void configureKerby(String baseDir) throws Exception {
-
-        //System.setProperty("sun.security.krb5.debug", "true");
-        System.setProperty("java.security.krb5.conf", baseDir + 
"/target/krb5.conf");
-
-        kerbyServer = new SimpleKdcServer();
-
-        kerbyServer.setKdcRealm("kafka.apache.org");
-        kerbyServer.setAllowUdp(false);
-        kerbyServer.setWorkDir(new File(baseDir + "/target"));
-
-        kerbyServer.init();
-
-        // Create principals
-        String zookeeper = "zookeeper/[email protected]";
-        String kafka = "kafka/[email protected]";
-        String client = "[email protected]";
-
-        kerbyServer.createPrincipal(zookeeper, "zookeeper");
-        File keytabFile = new File(baseDir + "/target/zookeeper.keytab");
-        kerbyServer.exportPrincipal(zookeeper, keytabFile);
-
-        kerbyServer.createPrincipal(kafka, "kafka");
-        keytabFile = new File(baseDir + "/target/kafka.keytab");
-        kerbyServer.exportPrincipal(kafka, keytabFile);
-
-        kerbyServer.createPrincipal(client, "client");
-        keytabFile = new File(baseDir + "/target/client.keytab");
-        kerbyServer.exportPrincipal(client, keytabFile);
-
-        kerbyServer.start();
     }
 
     @AfterAll
@@ -182,8 +147,8 @@ public class KafkaRangerTopicCreationTest {
 
     @Test
     public void testCreateTopic() throws Exception {
-        final String topic = "test";
-        Properties properties = new Properties();
+        final String topic      = "test";
+        Properties   properties = new Properties();
         properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + port);
         properties.put("client.id", "test-consumer-id");
         properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_PLAINTEXT");
@@ -192,10 +157,41 @@ public class KafkaRangerTopicCreationTest {
             result.values().get(topic).get();
             for (Map.Entry<String, KafkaFuture<Void>> entry : 
result.values().entrySet()) {
                 System.out.println("Create Topic : " + entry.getKey() + " " +
-                    "isCancelled : " + entry.getValue().isCancelled() + " " +
-                    "isCompletedExceptionally : " + 
entry.getValue().isCompletedExceptionally() + " " +
-                    "isDone : " + entry.getValue().isDone());
+                        "isCancelled : " + entry.getValue().isCancelled() + " 
" +
+                        "isCompletedExceptionally : " + 
entry.getValue().isCompletedExceptionally() + " " +
+                        "isDone : " + entry.getValue().isDone());
             }
         }
     }
+
+    private static void configureKerby(String baseDir) throws Exception {
+        System.setProperty("java.security.krb5.conf", baseDir + 
"/target/krb5.conf");
+
+        kerbyServer = new SimpleKdcServer();
+
+        kerbyServer.setKdcRealm("kafka.apache.org");
+        kerbyServer.setAllowUdp(false);
+        kerbyServer.setWorkDir(new File(baseDir + "/target"));
+
+        kerbyServer.init();
+
+        // Create principals
+        String zookeeper = "zookeeper/[email protected]";
+        String kafka     = "kafka/[email protected]";
+        String client    = "[email protected]";
+
+        kerbyServer.createPrincipal(zookeeper, "zookeeper");
+        File keytabFile = new File(baseDir + "/target/zookeeper.keytab");
+        kerbyServer.exportPrincipal(zookeeper, keytabFile);
+
+        kerbyServer.createPrincipal(kafka, "kafka");
+        keytabFile = new File(baseDir + "/target/kafka.keytab");
+        kerbyServer.exportPrincipal(kafka, keytabFile);
+
+        kerbyServer.createPrincipal(client, "client");
+        keytabFile = new File(baseDir + "/target/client.keytab");
+        kerbyServer.exportPrincipal(client, keytabFile);
+
+        kerbyServer.start();
+    }
 }
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
index 70e62f8f7..b15d4ca3d 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java
@@ -17,6 +17,16 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.RFC4519Style;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.bouncycastle.cert.X509v3CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
+
 import java.io.File;
 import java.io.OutputStream;
 import java.math.BigInteger;
@@ -31,60 +41,47 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.Properties;
 
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.bouncycastle.asn1.x500.X500Name;
-import org.bouncycastle.asn1.x500.style.RFC4519Style;
-import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
-import org.bouncycastle.cert.X509v3CertificateBuilder;
-import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
-import org.bouncycastle.operator.ContentSigner;
-import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
-
 public final class KafkaTestUtils {
-    
-    public static String createAndStoreKey(String subjectName, String 
issuerName, BigInteger serial, String keystorePassword,
-               String keystoreAlias, String keyPassword, KeyStore trustStore) 
throws Exception {
-       
-       // Create KeyPair
-       KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
-       keyPairGenerator.initialize(2048, new SecureRandom());
-       KeyPair keyPair = keyPairGenerator.generateKeyPair();
-       
-       Date currentDate = new Date();
-       Date expiryDate = new Date(currentDate.getTime() + 365L * 24L * 60L * 
60L * 1000L);
-       
-       // Create X509Certificate
-       X509v3CertificateBuilder certBuilder =
-                       new X509v3CertificateBuilder(new 
X500Name(RFC4519Style.INSTANCE, issuerName), serial, currentDate, expiryDate, 
-                                       new X500Name(RFC4519Style.INSTANCE, 
subjectName), 
SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()));
-       ContentSigner contentSigner = new 
JcaContentSignerBuilder("SHA256WithRSAEncryption").build(keyPair.getPrivate());
-       X509Certificate certificate = new 
JcaX509CertificateConverter().getCertificate(certBuilder.build(contentSigner));
-       
-       // Store Private Key + Certificate in Keystore
-       KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
-       keystore.load(null, keystorePassword.toCharArray());
-       keystore.setKeyEntry(keystoreAlias, keyPair.getPrivate(), 
keyPassword.toCharArray(), new Certificate[] {certificate});
-       
-       File keystoreFile = File.createTempFile("kafkakeystore", ".jks");
-                       
-       try (OutputStream output = 
Files.newOutputStream(keystoreFile.toPath())) {
-               keystore.store(output, keystorePassword.toCharArray());
-       }
-       
-       // Now store the Certificate in the truststore
-       trustStore.setCertificateEntry(keystoreAlias, certificate);
-       
-       return keystoreFile.getPath();
-       
+    private KafkaTestUtils(){
+        // to block instantiation
     }
 
-       static void createSomeTopics(Properties adminProps) {
-               try (AdminClient adminClient = AdminClient.create(adminProps)) {
-                       adminClient.createTopics(Arrays.asList(
-                                       new NewTopic("test", 1, (short) 1),
-                                       new NewTopic("dev", 1, (short) 1)
-                       ));
-               }
-       }
+    public static String createAndStoreKey(String subjectName, String 
issuerName, BigInteger serial, String keystorePassword, String keystoreAlias, 
String keyPassword, KeyStore trustStore) throws Exception {
+        // Create KeyPair
+        KeyPairGenerator keyPairGenerator = 
KeyPairGenerator.getInstance("RSA");
+        keyPairGenerator.initialize(2048, new SecureRandom());
+        KeyPair keyPair = keyPairGenerator.generateKeyPair();
+
+        Date currentDate = new Date();
+        Date expiryDate  = new Date(currentDate.getTime() + 365L * 24L * 60L * 
60L * 1000L);
+
+        // Create X509Certificate
+        X509v3CertificateBuilder certBuilder =
+                new X509v3CertificateBuilder(new 
X500Name(RFC4519Style.INSTANCE, issuerName), serial, currentDate, expiryDate,
+                        new X500Name(RFC4519Style.INSTANCE, subjectName), 
SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()));
+        ContentSigner   contentSigner = new 
JcaContentSignerBuilder("SHA256WithRSAEncryption").build(keyPair.getPrivate());
+        X509Certificate certificate   = new 
JcaX509CertificateConverter().getCertificate(certBuilder.build(contentSigner));
+
+        // Store Private Key + Certificate in Keystore
+        KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
+        keystore.load(null, keystorePassword.toCharArray());
+        keystore.setKeyEntry(keystoreAlias, keyPair.getPrivate(), 
keyPassword.toCharArray(), new Certificate[] {certificate});
+
+        File keystoreFile = File.createTempFile("kafkakeystore", ".jks");
+
+        try (OutputStream output = 
Files.newOutputStream(keystoreFile.toPath())) {
+            keystore.store(output, keystorePassword.toCharArray());
+        }
+
+        // Now store the Certificate in the truststore
+        trustStore.setCertificateEntry(keystoreAlias, certificate);
+
+        return keystoreFile.getPath();
+    }
+
+    static void createSomeTopics(Properties adminProps) {
+        try (AdminClient adminClient = AdminClient.create(adminProps)) {
+            adminClient.createTopics(Arrays.asList(new NewTopic("test", 1, 
(short) 1), new NewTopic("dev", 1, (short) 1)));
+        }
+    }
 }
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/RangerAdminClientImpl.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/RangerAdminClientImpl.java
index 9117c6457..3693b2308 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/RangerAdminClientImpl.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/RangerAdminClientImpl.java
@@ -17,50 +17,48 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import org.apache.ranger.admin.client.AbstractRangerAdminClient;
+import org.apache.ranger.plugin.util.ServicePolicies;
+import org.apache.ranger.plugin.util.ServiceTags;
+
 import java.io.File;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.util.List;
 
-import org.apache.ranger.admin.client.AbstractRangerAdminClient;
-import org.apache.ranger.plugin.util.ServicePolicies;
-import org.apache.ranger.plugin.util.ServiceTags;
-
 /**
  * A test implementation of the RangerAdminClient interface that just reads 
policies in from a file and returns them
  */
 public class RangerAdminClientImpl extends AbstractRangerAdminClient {
-    private final static String cacheFilename = "kafka-policies.json";
-    private final static String tagFilename = "kafka-policies-tag.json";
+    private static final String cacheFilename = "kafka-policies.json";
+    private static final String tagFilename   = "kafka-policies-tag.json";
 
     public ServicePolicies getServicePoliciesIfUpdated(long lastKnownVersion, 
long lastActivationTimeInMillis) throws Exception {
-
         String basedir = System.getProperty("basedir");
         if (basedir == null) {
             basedir = new File(".").getCanonicalPath();
         }
 
-        java.nio.file.Path cachePath = 
FileSystems.getDefault().getPath(basedir, "/src/test/resources/" + 
cacheFilename);
-        byte[] cacheBytes = Files.readAllBytes(cachePath);
+        java.nio.file.Path cachePath  = 
FileSystems.getDefault().getPath(basedir, "/src/test/resources/" + 
cacheFilename);
+        byte[]             cacheBytes = Files.readAllBytes(cachePath);
 
         return gson.fromJson(new String(cacheBytes), ServicePolicies.class);
     }
 
     public ServiceTags getServiceTagsIfUpdated(long lastKnownVersion, long 
lastActivationTimeInMillis) throws Exception {
         String basedir = System.getProperty("basedir");
+
         if (basedir == null) {
             basedir = new File(".").getCanonicalPath();
         }
 
-        java.nio.file.Path cachePath = 
FileSystems.getDefault().getPath(basedir, "/src/test/resources/" + tagFilename);
-        byte[] cacheBytes = Files.readAllBytes(cachePath);
+        java.nio.file.Path cachePath  = 
FileSystems.getDefault().getPath(basedir, "/src/test/resources/" + tagFilename);
+        byte[]             cacheBytes = Files.readAllBytes(cachePath);
 
         return gson.fromJson(new String(cacheBytes), ServiceTags.class);
     }
 
-    public List<String> getTagTypes(String tagTypePattern) throws Exception {
+    public List<String> getTagTypes(String tagTypePattern) {
         return null;
     }
-
-    
-}
\ No newline at end of file
+}

Reply via email to