This is an automated email from the ASF dual-hosted git repository.

mandarambawane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new be7b20872 ATLAS-4975 : kafka-bridge module: update for code 
readability improvements (#380)
be7b20872 is described below

commit be7b20872eb1b1b81eb3cfbf7ddd9dbae1ae1df7
Author: mandarambawane <mandar.ambaw...@freestoneinfotech.com>
AuthorDate: Wed Jul 16 20:15:32 2025 +0530

    ATLAS-4975 : kafka-bridge module: update for code readability improvements 
(#380)
---
 addons/kafka-bridge/pom.xml                        |   5 +
 .../org/apache/atlas/kafka/bridge/KafkaBridge.java | 156 ++++++++++-----------
 .../kafka/bridge/SchemaRegistryConnector.java      |  31 ++--
 .../apache/atlas/kafka/bridge/KafkaBridgeTest.java |  11 +-
 4 files changed, 94 insertions(+), 109 deletions(-)

diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index a1dd3a666..1f0f54eab 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -32,6 +32,11 @@
     <name>Apache Atlas Kafka Bridge</name>
     <description>Apache Atlas Kafka Bridge Module</description>
 
+    <properties>
+        <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+        <checkstyle.skip>false</checkstyle.skip>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>com.google.guava</groupId>
diff --git 
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
 
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index 0878ec29f..8223ce56c 100644
--- 
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ 
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -29,6 +29,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.utils.AtlasConfigurationUtil;
 import org.apache.atlas.utils.AuthenticationUtil;
 import org.apache.atlas.utils.KafkaUtils;
+import org.apache.avro.Schema;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -38,7 +39,6 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.json.simple.JSONObject;
@@ -49,43 +49,41 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
-import org.apache.avro.Schema;
-import java.io.IOException;
 
 public class KafkaBridge {
-    private static final Logger LOG                               = 
LoggerFactory.getLogger(KafkaBridge.class);
-    private static final String KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE= 
System.getenv("KAFKA_SCHEMA_REGISTRY");
-    public static        String KAFKA_SCHEMA_REGISTRY_HOSTNAME    = 
"localhost";
-    private static final int    EXIT_CODE_SUCCESS                 = 0;
-    private static final int    EXIT_CODE_FAILED                  = 1;
-    private static final String ATLAS_ENDPOINT                    = 
"atlas.rest.address";
-    private static final String DEFAULT_ATLAS_URL                 = 
"http://localhost:21000/";;
-    private static final String CLUSTER_NAME_KEY                  = 
"atlas.cluster.name";
-    private static final String KAFKA_METADATA_NAMESPACE          = 
"atlas.metadata.namespace";
-    private static final String DEFAULT_CLUSTER_NAME              = "primary";
-    private static final String ATTRIBUTE_QUALIFIED_NAME          = 
"qualifiedName";
-    private static final String DESCRIPTION_ATTR                  = 
"description";
-    private static final String PARTITION_COUNT                   = 
"partitionCount";
-    private static final String REPLICATION_FACTOR                = 
"replicationFactor";
-    private static final String NAME                              = "name";
-    private static final String URI                               = "uri";
-    private static final String CLUSTERNAME                       = 
"clusterName";
-    private static final String TOPIC                             = "topic";
-    private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
-    private static final String TYPE                              = "type";
-    private static final String NAMESPACE                         = 
"namespace";
-    private static final String FIELDS                            = "fields";
-    private static final String AVRO_SCHEMA                       = 
"avroSchema";
-    private static final String SCHEMA_VERSION_ID                 = 
"versionId";
-
-    private static final String FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME      = 
"%s@%s@%s";
-    private static final String FORMAT_KAKFA_FIELD_QUALIFIED_NAME       = 
"%s@%s@%s@%s";
+    private static final Logger LOG                                 = 
LoggerFactory.getLogger(KafkaBridge.class);
+    private static final String KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE  = 
System.getenv("KAFKA_SCHEMA_REGISTRY");
+    private static final int    EXIT_CODE_SUCCESS                   = 0;
+    private static final int    EXIT_CODE_FAILED                    = 1;
+    private static final String ATLAS_ENDPOINT                      = 
"atlas.rest.address";
+    private static final String DEFAULT_ATLAS_URL                   = 
"http://localhost:21000/";;
+    private static final String CLUSTER_NAME_KEY                    = 
"atlas.cluster.name";
+    private static final String KAFKA_METADATA_NAMESPACE            = 
"atlas.metadata.namespace";
+    private static final String DEFAULT_CLUSTER_NAME                = 
"primary";
+    private static final String ATTRIBUTE_QUALIFIED_NAME            = 
"qualifiedName";
+    private static final String DESCRIPTION_ATTR                    = 
"description";
+    private static final String PARTITION_COUNT                     = 
"partitionCount";
+    private static final String REPLICATION_FACTOR                  = 
"replicationFactor";
+    private static final String NAME                                = "name";
+    private static final String URI                                 = "uri";
+    private static final String CLUSTERNAME                         = 
"clusterName";
+    private static final String TOPIC                               = "topic";
+    private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME   = "%s@%s";
+    private static final String TYPE                                = "type";
+    private static final String NAMESPACE                           = 
"namespace";
+    private static final String FIELDS                              = "fields";
+    private static final String AVRO_SCHEMA                         = 
"avroSchema";
+    private static final String SCHEMA_VERSION_ID                   = 
"versionId";
+
+    private static final String FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME  = 
"%s@%s@%s";
+    private static final String FORMAT_KAKFA_FIELD_QUALIFIED_NAME   = 
"%s@%s@%s@%s";
 
     private final List<String>  availableTopics;
     private final String        metadataNamespace;
@@ -93,6 +91,8 @@ public class KafkaBridge {
     private final KafkaUtils    kafkaUtils;
     private final CloseableHttpClient httpClient;
 
+    public static String kafkaSchemaRegistryHostname                = 
"localhost";
+
     public static void main(String[] args) {
         int           exitCode      = EXIT_CODE_FAILED;
         AtlasClientV2 atlasClientV2 = null;
@@ -105,7 +105,7 @@ public class KafkaBridge {
 
         try {
             Options options = new Options();
-            options.addOption("t","topic", true, "topic");
+            options.addOption("t", "topic", true, "topic");
             options.addOption("f", "filename", true, "filename");
 
             CommandLineParser parser        = new BasicParser();
@@ -116,10 +116,9 @@ public class KafkaBridge {
             String[]          urls          = 
atlasConf.getStringArray(ATLAS_ENDPOINT);
 
             if (urls == null || urls.length == 0) {
-                urls = new String[] { DEFAULT_ATLAS_URL };
+                urls = new String[] {DEFAULT_ATLAS_URL};
             }
 
-
             if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
                 String[] basicAuthUsernamePassword = 
AuthenticationUtil.getBasicAuthenticationInput();
 
@@ -134,8 +133,8 @@ public class KafkaBridge {
 
             KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, 
kafkaUtils);
 
-            if(StringUtils.isNotEmpty(KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE)) {
-                KAFKA_SCHEMA_REGISTRY_HOSTNAME = 
KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE;
+            if (StringUtils.isNotEmpty(KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE)) {
+                kafkaSchemaRegistryHostname = 
KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE;
             }
 
             if (StringUtils.isNotEmpty(fileToImport)) {
@@ -160,11 +159,11 @@ public class KafkaBridge {
 
                 exitCode = EXIT_CODE_SUCCESS;
             }
-        } catch(ParseException e) {
+        } catch (ParseException e) {
             LOG.error("Failed to parse arguments. Error: ", e.getMessage());
 
             printUsage();
-        } catch(Exception e) {
+        } catch (Exception e) {
             System.out.println("ImportKafkaEntities failed. Please check the 
log file for the detailed error message");
 
             e.printStackTrace();
@@ -213,15 +212,15 @@ public class KafkaBridge {
         List<String> topics = availableTopics;
 
         if (StringUtils.isNotEmpty(topicToImport)) {
-            List<String> topics_subset = new ArrayList<>();
+            List<String> topicsSubset = new ArrayList<>();
 
             for (String topic : topics) {
                 if (Pattern.compile(topicToImport).matcher(topic).matches()) {
-                    topics_subset.add(topic);
+                    topicsSubset.add(topic);
                 }
             }
 
-            topics = topics_subset;
+            topics = topicsSubset;
         }
 
         if (CollectionUtils.isNotEmpty(topics)) {
@@ -234,7 +233,7 @@ public class KafkaBridge {
     @VisibleForTesting
     AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
         String                 topicQualifiedName = 
getTopicQualifiedName(metadataNamespace, topic);
-        AtlasEntityWithExtInfo topicEntity        = 
findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(),topicQualifiedName);
+        AtlasEntityWithExtInfo topicEntity        = 
findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName);
 
         System.out.print("\n"); // add a new line for each topic
 
@@ -295,7 +294,7 @@ public class KafkaBridge {
             System.out.println("---Adding Avro field " + fullname);
             LOG.info("Importing Avro field: {}", fieldQualifiedName);
 
-            AtlasEntity entity = getFieldEntity(field, schemaName, namespace, 
version ,null, fullname);
+            AtlasEntity entity = getFieldEntity(field, schemaName, namespace, 
version, null, fullname);
 
             fieldEntity = createEntityInAtlas(new 
AtlasEntityWithExtInfo(entity));
         } else {
@@ -312,7 +311,6 @@ public class KafkaBridge {
         return fieldEntity;
     }
 
-
     @VisibleForTesting
     AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) throws 
Exception {
         final AtlasEntity ret;
@@ -329,7 +327,7 @@ public class KafkaBridge {
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
         ret.setAttribute(CLUSTERNAME, metadataNamespace);
         ret.setAttribute(TOPIC, topic);
-        ret.setAttribute(NAME,topic);
+        ret.setAttribute(NAME, topic);
         ret.setAttribute(DESCRIPTION_ATTR, topic);
         ret.setAttribute(URI, topic);
 
@@ -344,7 +342,7 @@ public class KafkaBridge {
 
         createdSchemas = findOrCreateAtlasSchema(topic);
 
-        if(createdSchemas.size() > 0) {
+        if (createdSchemas.size() > 0) {
             ret.setAttribute(AVRO_SCHEMA, createdSchemas);
             ret.setRelationshipAttribute(AVRO_SCHEMA, createdSchemas);
         }
@@ -357,7 +355,6 @@ public class KafkaBridge {
         final AtlasEntity ret;
         List<AtlasEntity> createdFields = new ArrayList<>();
 
-
         if (schemaEntity == null) {
             ret = new AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName());
         } else {
@@ -375,12 +372,12 @@ public class KafkaBridge {
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
         ret.setAttribute(TYPE, parsedSchema.getType());
         ret.setAttribute(NAMESPACE, namespace);
-        ret.setAttribute(NAME,parsedSchema.getName() + "(v" + version + ")");
+        ret.setAttribute(NAME, parsedSchema.getName() + "(v" + version + ")");
         ret.setAttribute(SCHEMA_VERSION_ID, version);
 
         createdFields = createNestedFields(parsedSchema, schemaName, 
namespace, version, "");
 
-        if(createdFields.size() > 0) {
+        if (createdFields.size() > 0) {
             ret.setRelationshipAttribute(FIELDS, createdFields);
         }
 
@@ -392,9 +389,8 @@ public class KafkaBridge {
         AtlasEntityWithExtInfo fieldInAtlas;
         JSONParser parser = new JSONParser();
 
-        for (Schema.Field field:parsedSchema.getFields()) {
-
-            if(field.schema().getType() == Schema.Type.ARRAY){
+        for (Schema.Field field : parsedSchema.getFields()) {
+            if (field.schema().getType() == Schema.Type.ARRAY) {
                 System.out.println("ARRAY DETECTED");
                 String subfields = ((JSONObject) 
parser.parse(field.schema().toString())).get("items").toString();
                 Schema parsedSubSchema = new Schema.Parser().parse(subfields);
@@ -402,15 +398,11 @@ public class KafkaBridge {
                 fullname = concatFullname(field.name(), fullname, 
parsedSubSchema.getName());
 
                 entityArray.addAll(createNestedFields(parsedSubSchema, 
schemaName, namespace, version, fullname));
-            }
-
-            else if(field.schema().getType() == Schema.Type.RECORD && 
!schemaName.equals(field.name())) {
-                    System.out.println("NESTED RECORD DETECTED");
-                    fullname = concatFullname(field.name(), fullname, "");
-                    entityArray.addAll(createNestedFields(field.schema(), 
schemaName, namespace, version, fullname));
-            }
-
-            else{
+            } else if (field.schema().getType() == Schema.Type.RECORD && 
!schemaName.equals(field.name())) {
+                System.out.println("NESTED RECORD DETECTED");
+                fullname = concatFullname(field.name(), fullname, "");
+                entityArray.addAll(createNestedFields(field.schema(), 
schemaName, namespace, version, fullname));
+            } else {
                 fieldInAtlas = createOrUpdateField(field, schemaName, 
namespace, version, fullname);
 
                 entityArray.add(fieldInAtlas.getEntity());
@@ -443,7 +435,7 @@ public class KafkaBridge {
         String qualifiedName = getFieldQualifiedName(metadataNamespace, 
fullname, schemaName + "-value", "v" + version);
 
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
-        ret.setAttribute(NAME,fullname + "(v" + version + ")");
+        ret.setAttribute(NAME, fullname + "(v" + version + ")");
         //ret.setAttribute(field.schema().getType()); --> does not work, since 
type expects array<avro_type>. Instead setting Description
         ret.setAttribute(DESCRIPTION_ATTR, field.schema().getType());
         return ret;
@@ -461,7 +453,7 @@ public class KafkaBridge {
 
     @VisibleForTesting
     static String getFieldQualifiedName(String metadataNamespace, String 
field, String schemaName, String version) {
-        return String.format(FORMAT_KAKFA_FIELD_QUALIFIED_NAME , 
field.toLowerCase(), schemaName.toLowerCase(), version, metadataNamespace);
+        return String.format(FORMAT_KAKFA_FIELD_QUALIFIED_NAME, 
field.toLowerCase(), schemaName.toLowerCase(), version, metadataNamespace);
     }
 
     @VisibleForTesting
@@ -470,8 +462,7 @@ public class KafkaBridge {
 
         try {
             ret = atlasClientV2.getEntityByAttribute(typeName, 
Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
-        }
-        catch (Exception e){
+        } catch (Exception e) {
             LOG.info("Exception on finding Atlas Entity: {}", e);
         }
 
@@ -509,12 +500,12 @@ public class KafkaBridge {
 
                 LOG.info("Updated {} entity: name={}, guid={} ", 
ret.getEntity().getTypeName(), 
ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
ret.getEntity().getGuid());
             } else {
-                LOG.info("Entity: name={} ", entity.toString() + " not updated 
as it is unchanged from what is in Atlas" );
+                LOG.info("Entity: name={} ", entity.toString() + " not updated 
as it is unchanged from what is in Atlas");
 
                 ret = entity;
             }
         } else {
-            LOG.info("Entity: name={} ", entity.toString() + " not updated as 
it is unchanged from what is in Atlas" );
+            LOG.info("Entity: name={} ", entity.toString() + " not updated as 
it is unchanged from what is in Atlas");
 
             ret = entity;
         }
@@ -522,17 +513,16 @@ public class KafkaBridge {
         return ret;
     }
 
-    private static  void printUsage(){
+    private static void printUsage() {
         System.out.println("Usage 1: import-kafka.sh");
         System.out.println("Usage 2: import-kafka.sh [-t <topic regex> OR 
--topic <topic regex>]");
-        System.out.println("Usage 3: import-kafka.sh [-f <filename>]" );
+        System.out.println("Usage 3: import-kafka.sh [-f <filename>]");
         System.out.println("   Format:");
         System.out.println("        topic1 OR topic1 regex");
         System.out.println("        topic2 OR topic2 regex");
         System.out.println("        topic3 OR topic3 regex");
     }
 
-
     private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) {
         if (entity != null) {
             clearRelationshipAttributes(entity.getEntity());
@@ -560,19 +550,19 @@ public class KafkaBridge {
     private List<AtlasEntity> findOrCreateAtlasSchema(String schemaName) 
throws Exception {
         List<AtlasEntity> entities = new ArrayList<>();
         // Handling Schemas
-        ArrayList<Integer> versions = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(httpClient,schemaName + 
"-value");
+        ArrayList<Integer> versions = 
SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(httpClient, schemaName + 
"-value");
 
-        for (int version:versions) {
+        for (int version : versions) {
             String kafkaSchema = 
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(httpClient, schemaName 
+ "-value", version);
 
-            if(kafkaSchema != null) {
+            if (kafkaSchema != null) {
                 // Schema exists in Kafka Schema Registry
                 System.out.println("---Found Schema " + schemaName + "-value 
in Kafka Schema Registry with Version " + version);
                 LOG.info("Found Schema {}-value in Kafka Schema Registry with 
Version {}", schemaName, version);
 
                 AtlasEntityWithExtInfo atlasSchemaEntity = 
findEntityInAtlas(KafkaDataTypes.AVRO_SCHEMA.getName(), 
getSchemaQualifiedName(metadataNamespace, schemaName  + "-value", "v" + 
version));
 
-                if(atlasSchemaEntity != null) {
+                if (atlasSchemaEntity != null) {
                     // Schema exists in Kafka Schema Registry AND in Atlas
 
                     System.out.println("---Found Entity avro_schema " + 
schemaName + " in Atlas");
@@ -597,25 +587,21 @@ public class KafkaBridge {
         return entities;
     }
 
-    private String concatFullname(String fieldName,String fullname, String 
subSchemaName){
-        if(fullname.isEmpty()){
-            if(subSchemaName.isEmpty()) {
+    private String concatFullname(String fieldName, String fullname, String 
subSchemaName) {
+        if (fullname.isEmpty()) {
+            if (subSchemaName.isEmpty()) {
                 fullname = fieldName;
-            }
-            else {
+            } else {
                 fullname = fieldName + "." + subSchemaName;
             }
-
-        }
-        else{
-            if(subSchemaName.isEmpty()) {
+        } else {
+            if (subSchemaName.isEmpty()) {
                 fullname = fullname + "." + fieldName;
-            }
-            else {
+            } else {
                 fullname = fullname + "." + subSchemaName + "." + fieldName;
             }
         }
 
         return fullname;
     }
-}
\ No newline at end of file
+}
diff --git 
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
 
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
index d5d85d6b4..958d2a2aa 100644
--- 
a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
+++ 
b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
@@ -36,14 +36,18 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 
 public class SchemaRegistryConnector {
-    private static final String SCHEMA_KEY = "schema";
-    private static final Logger LOG        = 
LoggerFactory.getLogger(SchemaRegistryConnector.class);
+    private static final String SCHEMA_KEY  = "schema";
+    private static final Logger LOG         = 
LoggerFactory.getLogger(SchemaRegistryConnector.class);
+
+    private SchemaRegistryConnector() {
+        //private constructor
+    }
 
     static ArrayList<Integer> 
getVersionsKafkaSchemaRegistry(CloseableHttpClient httpClient, String subject) 
throws IOException {
         ArrayList<Integer> list = new ArrayList<>();
         JSONParser parser = new JSONParser();
 
-        HttpGet getRequest = new HttpGet("http://"; + 
KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject + 
"/versions/");
+        HttpGet getRequest = new HttpGet("http://"; + 
KafkaBridge.kafkaSchemaRegistryHostname + "/subjects/" + subject + 
"/versions/");
         getRequest.addHeader("accept", "application/json");
         getRequest.addHeader("Content-Type", 
"application/vnd.schemaregistry.v1+json");
 
@@ -72,7 +76,6 @@ public class SchemaRegistryConnector {
                     System.out.println("---Error reading versions to schema: " 
+ subject + " in Kafka");
                     LOG.error("Error reading versions to schema: " + subject + 
" in Kafka: ", e.getMessage());
                 }
-
             } else if (response.getStatusLine().getStatusCode() == 
HttpStatus.SC_NOT_FOUND) {
                 // did not find any schema to the topic
                 System.out.println("---No schema versions found for schema: " 
+ subject + " in Schema Registry");
@@ -85,8 +88,7 @@ public class SchemaRegistryConnector {
 
             EntityUtils.consumeQuietly(response.getEntity());
             response.close();
-        }
-        catch(Exception e) {
+        } catch (Exception e) {
             System.out.println("---Error getting versions to schema: " + 
subject + " from Kafka");
             LOG.error("Error getting versions to schema: " + subject + " from 
Kafka: ", e);
         }
@@ -94,14 +96,14 @@ public class SchemaRegistryConnector {
     }
 
     static String getSchemaFromKafkaSchemaRegistry(CloseableHttpClient 
httpClient, String subject, int version) throws IOException {
-        HttpGet getRequest = new HttpGet("http://"; + 
KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject + 
"/versions/" + version);
+        HttpGet getRequest = new HttpGet("http://"; + 
KafkaBridge.kafkaSchemaRegistryHostname + "/subjects/" + subject + "/versions/" 
+ version);
         getRequest.addHeader("accept", "application/json");
         getRequest.addHeader("Content-Type", 
"application/vnd.schemaregistry.v1+json");
         JSONParser parser = new JSONParser();
 
         CloseableHttpResponse response = httpClient.execute(getRequest);
 
-        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK){
+        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
             //found corresponding Schema in Registry
             try {
                 BufferedReader br = new BufferedReader(
@@ -116,19 +118,14 @@ public class SchemaRegistryConnector {
                 System.out.println("---Error reading versions to schema: " + 
subject + " in Kafka");
                 LOG.error("Error reading versions to schema: " + subject + " 
in Kafka: ", e);
             }
-
-        }
-
-        else if (response.getStatusLine().getStatusCode() == 404) {
+        } else if (response.getStatusLine().getStatusCode() == 404) {
             // did not find any schema to the topic
             System.out.println("---Cannot find the corresponding schema to: " 
+ subject + "in Kafka");
             LOG.info("Cannot find the corresponding schema to: {} in Kafka", 
subject);
-        }
-
-        else {
+        } else {
             // any other error when connecting to schema registry
-            System.out.println("---Cannot connect to schema registry at: " + 
KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME);
-            LOG.warn("Cannot connect to schema registry at: {}", 
KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME);
+            System.out.println("---Cannot connect to schema registry at: " + 
KafkaBridge.kafkaSchemaRegistryHostname);
+            LOG.warn("Cannot connect to schema registry at: {}", 
KafkaBridge.kafkaSchemaRegistryHostname);
         }
 
         EntityUtils.consumeQuietly(response.getEntity());
diff --git 
a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
 
b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
index adbaddd18..ece25edb7 100644
--- 
a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
+++ 
b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
@@ -28,8 +28,8 @@ import org.apache.atlas.utils.KafkaUtils;
 import org.apache.avro.Schema;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpStatus;
-import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.conn.ClientConnectionManager;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.mockito.ArgumentCaptor;
@@ -43,7 +43,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 
-
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -52,7 +51,6 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
 public class KafkaBridgeTest {
-
     private static final String TEST_TOPIC_NAME = "test_topic";
     private static final String CLUSTER_NAME = "primary";
     private static final String TOPIC_QUALIFIED_NAME = 
KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME);
@@ -309,7 +307,7 @@ public class KafkaBridgeTest {
         when(mockResponse.getEntity())
                 .thenReturn(mock(HttpEntity.class));
         when(mockResponse.getEntity().getContent())
-                .thenReturn(new ByteArrayInputStream(new 
String("{\"subject\":\"test-value\",\"version\":1,\"id\":1,\"schema\":"+ 
TEST_SCHEMA +"}").getBytes(StandardCharsets.UTF_8)));
+                .thenReturn(new ByteArrayInputStream(new 
String("{\"subject\":\"test-value\",\"version\":1,\"id\":1,\"schema\":" + 
TEST_SCHEMA + "}").getBytes(StandardCharsets.UTF_8)));
 
         CloseableHttpClient mockHttpClient = mock(CloseableHttpClient.class);
         when(mockHttpClient.execute(any()))
@@ -317,7 +315,7 @@ public class KafkaBridgeTest {
         when(mockHttpClient.getConnectionManager())
                 .thenReturn(mock(ClientConnectionManager.class));
 
-        String ret = 
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, 
TEST_SCHEMA_NAME,TEST_SCHEMA_VERSION);
+        String ret = 
SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, 
TEST_SCHEMA_NAME, TEST_SCHEMA_VERSION);
 
         assertEquals(TEST_SCHEMA, ret);
     }
@@ -344,5 +342,4 @@ public class KafkaBridgeTest {
 
         assertEquals(TEST_SCHEMA_VERSION_LIST, ret);
     }
-
-}
\ No newline at end of file
+}

Reply via email to