This is an automated email from the ASF dual-hosted git repository. mandarambawane pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new be7b20872 ATLAS-4975 : kafka-bridge module: update for code readability improvements (#380) be7b20872 is described below commit be7b20872eb1b1b81eb3cfbf7ddd9dbae1ae1df7 Author: mandarambawane <mandar.ambaw...@freestoneinfotech.com> AuthorDate: Wed Jul 16 20:15:32 2025 +0530 ATLAS-4975 : kafka-bridge module: update for code readability improvements (#380) --- addons/kafka-bridge/pom.xml | 5 + .../org/apache/atlas/kafka/bridge/KafkaBridge.java | 156 ++++++++++----------- .../kafka/bridge/SchemaRegistryConnector.java | 31 ++-- .../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 11 +- 4 files changed, 94 insertions(+), 109 deletions(-) diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml index a1dd3a666..1f0f54eab 100644 --- a/addons/kafka-bridge/pom.xml +++ b/addons/kafka-bridge/pom.xml @@ -32,6 +32,11 @@ <name>Apache Atlas Kafka Bridge</name> <description>Apache Atlas Kafka Bridge Module</description> + <properties> + <checkstyle.failOnViolation>true</checkstyle.failOnViolation> + <checkstyle.skip>false</checkstyle.skip> + </properties> + <dependencies> <dependency> <groupId>com.google.guava</groupId> diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java index 0878ec29f..8223ce56c 100644 --- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java +++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java @@ -29,6 +29,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.utils.AtlasConfigurationUtil; import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.KafkaUtils; +import org.apache.avro.Schema; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -38,7 +39,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; - import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.json.simple.JSONObject; @@ -49,43 +49,41 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; -import org.apache.avro.Schema; -import java.io.IOException; public class KafkaBridge { - private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class); - private static final String KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE= System.getenv("KAFKA_SCHEMA_REGISTRY"); - public static String KAFKA_SCHEMA_REGISTRY_HOSTNAME = "localhost"; - private static final int EXIT_CODE_SUCCESS = 0; - private static final int EXIT_CODE_FAILED = 1; - private static final String ATLAS_ENDPOINT = "atlas.rest.address"; - private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; - private static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; - private static final String KAFKA_METADATA_NAMESPACE = "atlas.metadata.namespace"; - private static final String DEFAULT_CLUSTER_NAME = "primary"; - private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; - private static final String DESCRIPTION_ATTR = "description"; - private static final String PARTITION_COUNT = "partitionCount"; - private static final String REPLICATION_FACTOR = "replicationFactor"; - private static final String NAME = "name"; - private static final String URI = "uri"; - private static final String CLUSTERNAME = "clusterName"; - private static final String TOPIC = "topic"; - private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s"; - private static final String TYPE = "type"; - private static final String NAMESPACE = "namespace"; - private static final String FIELDS = "fields"; - private static final String AVRO_SCHEMA = "avroSchema"; - private static final String SCHEMA_VERSION_ID = "versionId"; - - private static final String FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME = "%s@%s@%s"; - private static final String FORMAT_KAKFA_FIELD_QUALIFIED_NAME = "%s@%s@%s@%s"; + private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class); + private static final String KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE = System.getenv("KAFKA_SCHEMA_REGISTRY"); + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final String ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + private static final String KAFKA_METADATA_NAMESPACE = "atlas.metadata.namespace"; + private static final String DEFAULT_CLUSTER_NAME = "primary"; + private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + private static final String DESCRIPTION_ATTR = "description"; + private static final String PARTITION_COUNT = "partitionCount"; + private static final String REPLICATION_FACTOR = "replicationFactor"; + private static final String NAME = "name"; + private static final String URI = "uri"; + private static final String CLUSTERNAME = "clusterName"; + private static final String TOPIC = "topic"; + private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s"; + private static final String TYPE = "type"; + private static final String NAMESPACE = "namespace"; + private static final String FIELDS = "fields"; + private static final String AVRO_SCHEMA = "avroSchema"; + private static final String SCHEMA_VERSION_ID = "versionId"; + + private static final String FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME = "%s@%s@%s"; + private static final String FORMAT_KAKFA_FIELD_QUALIFIED_NAME = "%s@%s@%s@%s"; private final List<String> availableTopics; private final String metadataNamespace; @@ -93,6 +91,8 @@ public class KafkaBridge { private final KafkaUtils kafkaUtils; private final CloseableHttpClient httpClient; + public static String kafkaSchemaRegistryHostname = "localhost"; + public static void main(String[] args) { int exitCode = EXIT_CODE_FAILED; AtlasClientV2 atlasClientV2 = null; @@ -105,7 +105,7 @@ public class KafkaBridge { try { Options options = new Options(); - options.addOption("t","topic", true, "topic"); + options.addOption("t", "topic", true, "topic"); options.addOption("f", "filename", true, "filename"); CommandLineParser parser = new BasicParser(); @@ -116,10 +116,9 @@ public class KafkaBridge { String[] urls = atlasConf.getStringArray(ATLAS_ENDPOINT); if (urls == null || urls.length == 0) { - urls = new String[] { DEFAULT_ATLAS_URL }; + urls = new String[] {DEFAULT_ATLAS_URL}; } - if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); @@ -134,8 +133,8 @@ public class KafkaBridge { KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils); - if(StringUtils.isNotEmpty(KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE)) { - KAFKA_SCHEMA_REGISTRY_HOSTNAME = KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE; + if (StringUtils.isNotEmpty(KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE)) { + kafkaSchemaRegistryHostname = KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE; } if (StringUtils.isNotEmpty(fileToImport)) { @@ -160,11 +159,11 @@ public class KafkaBridge { exitCode = EXIT_CODE_SUCCESS; } - } catch(ParseException e) { + } catch (ParseException e) { LOG.error("Failed to parse arguments. Error: ", e.getMessage()); printUsage(); - } catch(Exception e) { + } catch (Exception e) { System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message"); e.printStackTrace(); @@ -213,15 +212,15 @@ public class KafkaBridge { List<String> topics = availableTopics; if (StringUtils.isNotEmpty(topicToImport)) { - List<String> topics_subset = new ArrayList<>(); + List<String> topicsSubset = new ArrayList<>(); for (String topic : topics) { if (Pattern.compile(topicToImport).matcher(topic).matches()) { - topics_subset.add(topic); + topicsSubset.add(topic); } } - topics = topics_subset; + topics = topicsSubset; } if (CollectionUtils.isNotEmpty(topics)) { @@ -234,7 +233,7 @@ public class KafkaBridge { @VisibleForTesting AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception { String topicQualifiedName = getTopicQualifiedName(metadataNamespace, topic); - AtlasEntityWithExtInfo topicEntity = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(),topicQualifiedName); + AtlasEntityWithExtInfo topicEntity = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName); System.out.print("\n"); // add a new line for each topic @@ -295,7 +294,7 @@ public class KafkaBridge { System.out.println("---Adding Avro field " + fullname); LOG.info("Importing Avro field: {}", fieldQualifiedName); - AtlasEntity entity = getFieldEntity(field, schemaName, namespace, version ,null, fullname); + AtlasEntity entity = getFieldEntity(field, schemaName, namespace, version, null, fullname); fieldEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity)); } else { @@ -312,7 +311,6 @@ public class KafkaBridge { return fieldEntity; } - @VisibleForTesting AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) throws Exception { final AtlasEntity ret; @@ -329,7 +327,7 @@ public class KafkaBridge { ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName); ret.setAttribute(CLUSTERNAME, metadataNamespace); ret.setAttribute(TOPIC, topic); - ret.setAttribute(NAME,topic); + ret.setAttribute(NAME, topic); ret.setAttribute(DESCRIPTION_ATTR, topic); ret.setAttribute(URI, topic); @@ -344,7 +342,7 @@ public class KafkaBridge { createdSchemas = findOrCreateAtlasSchema(topic); - if(createdSchemas.size() > 0) { + if (createdSchemas.size() > 0) { ret.setAttribute(AVRO_SCHEMA, createdSchemas); ret.setRelationshipAttribute(AVRO_SCHEMA, createdSchemas); } @@ -357,7 +355,6 @@ public class KafkaBridge { final AtlasEntity ret; List<AtlasEntity> createdFields = new ArrayList<>(); - if (schemaEntity == null) { ret = new AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName()); } else { @@ -375,12 +372,12 @@ public class KafkaBridge { ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName); ret.setAttribute(TYPE, parsedSchema.getType()); ret.setAttribute(NAMESPACE, namespace); - ret.setAttribute(NAME,parsedSchema.getName() + "(v" + version + ")"); + ret.setAttribute(NAME, parsedSchema.getName() + "(v" + version + ")"); ret.setAttribute(SCHEMA_VERSION_ID, version); createdFields = createNestedFields(parsedSchema, schemaName, namespace, version, ""); - if(createdFields.size() > 0) { + if (createdFields.size() > 0) { ret.setRelationshipAttribute(FIELDS, createdFields); } @@ -392,9 +389,8 @@ public class KafkaBridge { AtlasEntityWithExtInfo fieldInAtlas; JSONParser parser = new JSONParser(); - for (Schema.Field field:parsedSchema.getFields()) { - - if(field.schema().getType() == Schema.Type.ARRAY){ + for (Schema.Field field : parsedSchema.getFields()) { + if (field.schema().getType() == Schema.Type.ARRAY) { System.out.println("ARRAY DETECTED"); String subfields = ((JSONObject) parser.parse(field.schema().toString())).get("items").toString(); Schema parsedSubSchema = new Schema.Parser().parse(subfields); @@ -402,15 +398,11 @@ public class KafkaBridge { fullname = concatFullname(field.name(), fullname, parsedSubSchema.getName()); entityArray.addAll(createNestedFields(parsedSubSchema, schemaName, namespace, version, fullname)); - } - - else if(field.schema().getType() == Schema.Type.RECORD && !schemaName.equals(field.name())) { - System.out.println("NESTED RECORD DETECTED"); - fullname = concatFullname(field.name(), fullname, ""); - entityArray.addAll(createNestedFields(field.schema(), schemaName, namespace, version, fullname)); - } - - else{ + } else if (field.schema().getType() == Schema.Type.RECORD && !schemaName.equals(field.name())) { + System.out.println("NESTED RECORD DETECTED"); + fullname = concatFullname(field.name(), fullname, ""); + entityArray.addAll(createNestedFields(field.schema(), schemaName, namespace, version, fullname)); + } else { fieldInAtlas = createOrUpdateField(field, schemaName, namespace, version, fullname); entityArray.add(fieldInAtlas.getEntity()); @@ -443,7 +435,7 @@ public class KafkaBridge { String qualifiedName = getFieldQualifiedName(metadataNamespace, fullname, schemaName + "-value", "v" + version); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName); - ret.setAttribute(NAME,fullname + "(v" + version + ")"); + ret.setAttribute(NAME, fullname + "(v" + version + ")"); //ret.setAttribute(field.schema().getType()); --> does not work, since type expects array<avro_type>. Instead setting Description ret.setAttribute(DESCRIPTION_ATTR, field.schema().getType()); return ret; @@ -461,7 +453,7 @@ public class KafkaBridge { @VisibleForTesting static String getFieldQualifiedName(String metadataNamespace, String field, String schemaName, String version) { - return String.format(FORMAT_KAKFA_FIELD_QUALIFIED_NAME , field.toLowerCase(), schemaName.toLowerCase(), version, metadataNamespace); + return String.format(FORMAT_KAKFA_FIELD_QUALIFIED_NAME, field.toLowerCase(), schemaName.toLowerCase(), version, metadataNamespace); } @VisibleForTesting @@ -470,8 +462,7 @@ public class KafkaBridge { try { ret = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); - } - catch (Exception e){ + } catch (Exception e) { LOG.info("Exception on finding Atlas Entity: {}", e); } @@ -509,12 +500,12 @@ public class KafkaBridge { LOG.info("Updated {} entity: name={}, guid={} ", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid()); } else { - LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas" ); + LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas"); ret = entity; } } else { - LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas" ); + LOG.info("Entity: name={} ", entity.toString() + " not updated as it is unchanged from what is in Atlas"); ret = entity; } @@ -522,17 +513,16 @@ public class KafkaBridge { return ret; } - private static void printUsage(){ + private static void printUsage() { System.out.println("Usage 1: import-kafka.sh"); System.out.println("Usage 2: import-kafka.sh [-t <topic regex> OR --topic <topic regex>]"); - System.out.println("Usage 3: import-kafka.sh [-f <filename>]" ); + System.out.println("Usage 3: import-kafka.sh [-f <filename>]"); System.out.println(" Format:"); System.out.println(" topic1 OR topic1 regex"); System.out.println(" topic2 OR topic2 regex"); System.out.println(" topic3 OR topic3 regex"); } - private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) { if (entity != null) { clearRelationshipAttributes(entity.getEntity()); @@ -560,19 +550,19 @@ public class KafkaBridge { private List<AtlasEntity> findOrCreateAtlasSchema(String schemaName) throws Exception { List<AtlasEntity> entities = new ArrayList<>(); // Handling Schemas - ArrayList<Integer> versions = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(httpClient,schemaName + "-value"); + ArrayList<Integer> versions = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(httpClient, schemaName + "-value"); - for (int version:versions) { + for (int version : versions) { String kafkaSchema = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(httpClient, schemaName + "-value", version); - if(kafkaSchema != null) { + if (kafkaSchema != null) { // Schema exists in Kafka Schema Registry System.out.println("---Found Schema " + schemaName + "-value in Kafka Schema Registry with Version " + version); LOG.info("Found Schema {}-value in Kafka Schema Registry with Version {}", schemaName, version); AtlasEntityWithExtInfo atlasSchemaEntity = findEntityInAtlas(KafkaDataTypes.AVRO_SCHEMA.getName(), getSchemaQualifiedName(metadataNamespace, schemaName + "-value", "v" + version)); - if(atlasSchemaEntity != null) { + if (atlasSchemaEntity != null) { // Schema exists in Kafka Schema Registry AND in Atlas System.out.println("---Found Entity avro_schema " + schemaName + " in Atlas"); @@ -597,25 +587,21 @@ public class KafkaBridge { return entities; } - private String concatFullname(String fieldName,String fullname, String subSchemaName){ - if(fullname.isEmpty()){ - if(subSchemaName.isEmpty()) { + private String concatFullname(String fieldName, String fullname, String subSchemaName) { + if (fullname.isEmpty()) { + if (subSchemaName.isEmpty()) { fullname = fieldName; - } - else { + } else { fullname = fieldName + "." + subSchemaName; } - - } - else{ - if(subSchemaName.isEmpty()) { + } else { + if (subSchemaName.isEmpty()) { fullname = fullname + "." + fieldName; - } - else { + } else { fullname = fullname + "." + subSchemaName + "." + fieldName; } } return fullname; } -} \ No newline at end of file +} diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java index d5d85d6b4..958d2a2aa 100644 --- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java +++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java @@ -36,14 +36,18 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; public class SchemaRegistryConnector { - private static final String SCHEMA_KEY = "schema"; - private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryConnector.class); + private static final String SCHEMA_KEY = "schema"; + private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryConnector.class); + + private SchemaRegistryConnector() { + //private constructor + } static ArrayList<Integer> getVersionsKafkaSchemaRegistry(CloseableHttpClient httpClient, String subject) throws IOException { ArrayList<Integer> list = new ArrayList<>(); JSONParser parser = new JSONParser(); - HttpGet getRequest = new HttpGet("http://" + KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject + "/versions/"); + HttpGet getRequest = new HttpGet("http://" + KafkaBridge.kafkaSchemaRegistryHostname + "/subjects/" + subject + "/versions/"); getRequest.addHeader("accept", "application/json"); getRequest.addHeader("Content-Type", "application/vnd.schemaregistry.v1+json"); @@ -72,7 +76,6 @@ public class SchemaRegistryConnector { System.out.println("---Error reading versions to schema: " + subject + " in Kafka"); LOG.error("Error reading versions to schema: " + subject + " in Kafka: ", e.getMessage()); } - } else if (response.getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) { // did not find any schema to the topic System.out.println("---No schema versions found for schema: " + subject + " in Schema Registry"); @@ -85,8 +88,7 @@ public class SchemaRegistryConnector { EntityUtils.consumeQuietly(response.getEntity()); response.close(); - } - catch(Exception e) { + } catch (Exception e) { System.out.println("---Error getting versions to schema: " + subject + " from Kafka"); LOG.error("Error getting versions to schema: " + subject + " from Kafka: ", e); } @@ -94,14 +96,14 @@ public class SchemaRegistryConnector { } static String getSchemaFromKafkaSchemaRegistry(CloseableHttpClient httpClient, String subject, int version) throws IOException { - HttpGet getRequest = new HttpGet("http://" + KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject + "/versions/" + version); + HttpGet getRequest = new HttpGet("http://" + KafkaBridge.kafkaSchemaRegistryHostname + "/subjects/" + subject + "/versions/" + version); getRequest.addHeader("accept", "application/json"); getRequest.addHeader("Content-Type", "application/vnd.schemaregistry.v1+json"); JSONParser parser = new JSONParser(); CloseableHttpResponse response = httpClient.execute(getRequest); - if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK){ + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { //found corresponding Schema in Registry try { BufferedReader br = new BufferedReader( @@ -116,19 +118,14 @@ public class SchemaRegistryConnector { System.out.println("---Error reading versions to schema: " + subject + " in Kafka"); LOG.error("Error reading versions to schema: " + subject + " in Kafka: ", e); } - - } - - else if (response.getStatusLine().getStatusCode() == 404) { + } else if (response.getStatusLine().getStatusCode() == 404) { // did not find any schema to the topic System.out.println("---Cannot find the corresponding schema to: " + subject + "in Kafka"); LOG.info("Cannot find the corresponding schema to: {} in Kafka", subject); - } - - else { + } else { // any other error when connecting to schema registry - System.out.println("---Cannot connect to schema registry at: " + KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME); - LOG.warn("Cannot connect to schema registry at: {}", KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME); + System.out.println("---Cannot connect to schema registry at: " + KafkaBridge.kafkaSchemaRegistryHostname); + LOG.warn("Cannot connect to schema registry at: {}", KafkaBridge.kafkaSchemaRegistryHostname); } EntityUtils.consumeQuietly(response.getEntity()); diff --git a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java index adbaddd18..ece25edb7 100644 --- a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java +++ b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java @@ -28,8 +28,8 @@ import org.apache.atlas.utils.KafkaUtils; import org.apache.avro.Schema; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; -import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.conn.ClientConnectionManager; import org.apache.http.impl.client.CloseableHttpClient; import org.mockito.ArgumentCaptor; @@ -43,7 +43,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; - import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -52,7 +51,6 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; public class KafkaBridgeTest { - private static final String TEST_TOPIC_NAME = "test_topic"; private static final String CLUSTER_NAME = "primary"; private static final String TOPIC_QUALIFIED_NAME = KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME); @@ -309,7 +307,7 @@ public class KafkaBridgeTest { when(mockResponse.getEntity()) .thenReturn(mock(HttpEntity.class)); when(mockResponse.getEntity().getContent()) - .thenReturn(new ByteArrayInputStream(new String("{\"subject\":\"test-value\",\"version\":1,\"id\":1,\"schema\":"+ TEST_SCHEMA +"}").getBytes(StandardCharsets.UTF_8))); + .thenReturn(new ByteArrayInputStream(new String("{\"subject\":\"test-value\",\"version\":1,\"id\":1,\"schema\":" + TEST_SCHEMA + "}").getBytes(StandardCharsets.UTF_8))); CloseableHttpClient mockHttpClient = mock(CloseableHttpClient.class); when(mockHttpClient.execute(any())) @@ -317,7 +315,7 @@ public class KafkaBridgeTest { when(mockHttpClient.getConnectionManager()) .thenReturn(mock(ClientConnectionManager.class)); - String ret = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, TEST_SCHEMA_NAME,TEST_SCHEMA_VERSION); + String ret = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, TEST_SCHEMA_NAME, TEST_SCHEMA_VERSION); assertEquals(TEST_SCHEMA, ret); } @@ -344,5 +342,4 @@ public class KafkaBridgeTest { assertEquals(TEST_SCHEMA_VERSION_LIST, ret); } - -} \ No newline at end of file +}