This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new ade75fe ATLAS-4111: Add 'replicationFactor' attribute to kafka_topic entity type + formatting changes ade75fe is described below commit ade75fe55f88ef3b25be8da18acfbab3a8d293e2 Author: Barnabas Maidics <b.maid...@gmail.com> AuthorDate: Mon Jan 25 14:26:45 2021 +0100 ATLAS-4111: Add 'replicationFactor' attribute to kafka_topic entity type + formatting changes Signed-off-by: Sarath Subramanian <sar...@apache.org> (cherry picked from commit ce1342dd6dd7cbc4d1e35bea3b26d08dc24bd67e) --- .../org/apache/atlas/kafka/bridge/KafkaBridge.java | 66 ++++---- .../patches/018-kafka_topic_add_rf_attribute.json | 23 +++ .../java/org/apache/atlas/utils/KafkaUtils.java | 173 ++++++++++++++------- 3 files changed, 174 insertions(+), 88 deletions(-) diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java index d22010d..bf74c67 100644 --- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java +++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java @@ -54,24 +54,23 @@ import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; public class KafkaBridge { - private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class); - - private static final int EXIT_CODE_SUCCESS = 0; - private static final int EXIT_CODE_FAILED = 1; - private static final String ATLAS_ENDPOINT = "atlas.rest.address"; - private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; - private static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; - private static final String KAFKA_METADATA_NAMESPACE = "atlas.metadata.namespace"; - private static final String DEFAULT_CLUSTER_NAME = "primary"; - private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; - private static final String DESCRIPTION_ATTR = "description"; - private static final String PARTITION_COUNT = "partitionCount"; - private static final String NAME = "name"; - private static final String URI = "uri"; - private static final String CLUSTERNAME = "clusterName"; - private static final String TOPIC = "topic"; - - private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s"; + private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class); + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final String ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; + private static final String KAFKA_METADATA_NAMESPACE = "atlas.metadata.namespace"; + private static final String DEFAULT_CLUSTER_NAME = "primary"; + private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + private static final String DESCRIPTION_ATTR = "description"; + private static final String PARTITION_COUNT = "partitionCount"; + private static final String REPLICATION_FACTOR = "replicationFactor"; + private static final String NAME = "name"; + private static final String URI = "uri"; + private static final String CLUSTERNAME = "clusterName"; + private static final String TOPIC = "topic"; + private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s"; private final List<String> availableTopics; private final String metadataNamespace; @@ -80,9 +79,9 @@ public class KafkaBridge { public static void main(String[] args) { - int exitCode = EXIT_CODE_FAILED; + int exitCode = EXIT_CODE_FAILED; AtlasClientV2 atlasClientV2 = null; - KafkaUtils kafkaUtils = null; + KafkaUtils kafkaUtils = null; try { Options options = new Options(); @@ -114,14 +113,15 @@ public class KafkaBridge { kafkaUtils = new KafkaUtils(atlasConf); KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils); + if (StringUtils.isNotEmpty(fileToImport)) { File f = new File(fileToImport); if (f.exists() && f.canRead()) { BufferedReader br = new BufferedReader(new FileReader(f)); - String line = null; + String line; - while((line = br.readLine()) != null) { + while ((line = br.readLine()) != null) { topicToImport = line.trim(); importer.importTopic(topicToImport); @@ -138,15 +138,19 @@ public class KafkaBridge { } } catch(ParseException e) { LOG.error("Failed to parse arguments. Error: ", e.getMessage()); + printUsage(); } catch(Exception e) { System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message"); + e.printStackTrace(); + LOG.error("ImportKafkaEntities failed", e); } finally { if (atlasClientV2 != null) { atlasClientV2.close(); } + if (kafkaUtils != null) { kafkaUtils.close(); } @@ -175,16 +179,18 @@ public class KafkaBridge { if (StringUtils.isNotEmpty(topicToImport)) { List<String> topics_subset = new ArrayList<>(); - for(String topic : topics) { + + for (String topic : topics) { if (Pattern.compile(topicToImport).matcher(topic).matches()) { topics_subset.add(topic); } } + topics = topics_subset; } if (CollectionUtils.isNotEmpty(topics)) { - for(String topic : topics) { + for (String topic : topics) { createOrUpdateTopic(topic); } } @@ -234,11 +240,14 @@ public class KafkaBridge { ret.setAttribute(NAME,topic); ret.setAttribute(DESCRIPTION_ATTR, topic); ret.setAttribute(URI, topic); + try { ret.setAttribute(PARTITION_COUNT, kafkaUtils.getPartitionCount(topic)); + ret.setAttribute(REPLICATION_FACTOR, kafkaUtils.getReplicationFactor(topic)); } catch (ExecutionException | InterruptedException e) { - LOG.error("Error while getting partition count for topic :" + topic, e); - throw new Exception("Error while getting partition count for topic :" + topic, e); + LOG.error("Error while getting partition data for topic :" + topic, e); + + throw new Exception("Error while getting partition data for topic :" + topic, e); } return ret; @@ -254,6 +263,7 @@ public class KafkaBridge { try { ret = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName); + clearRelationshipAttributes(ret); } catch (Exception e) { ret = null; // entity doesn't exist in Atlas @@ -288,7 +298,7 @@ public class KafkaBridge { @VisibleForTesting AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception { - AtlasEntityWithExtInfo ret = null; + AtlasEntityWithExtInfo ret; EntityMutationResponse response = atlasClientV2.updateEntity(entity); if (response != null) { @@ -348,4 +358,4 @@ public class KafkaBridge { entity.getRelationshipAttributes().clear(); } } -} +} \ No newline at end of file diff --git a/addons/models/1000-Hadoop/patches/018-kafka_topic_add_rf_attribute.json b/addons/models/1000-Hadoop/patches/018-kafka_topic_add_rf_attribute.json new file mode 100644 index 0000000..cc74bad --- /dev/null +++ b/addons/models/1000-Hadoop/patches/018-kafka_topic_add_rf_attribute.json @@ -0,0 +1,23 @@ +{ + "patches": [ + { + "id": "TYPEDEF_PATCH_1000_018_001", + "description": "Add 'replicationFactor' attribute to kafka_topic", + "action": "ADD_ATTRIBUTE", + "typeName": "kafka_topic", + "applyToVersion": "1.7", + "updateToVersion": "1.8", + "params": null, + "attributeDefs": [ + { + "name": "replicationFactor", + "typeName": "int", + "cardinality": "SINGLE", + "isIndexable": true, + "isOptional": true, + "isUnique": false + } + ] + } + ] +} diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java index 14e205a..eea3311 100644 --- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java +++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java @@ -48,37 +48,37 @@ public class KafkaUtils implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); - static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config"; - private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas"; - private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName"; - private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag"; - private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required"; + private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas"; + private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName"; + private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag"; + private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required"; private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required"; - private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option"; - private static final String JAAS_PRINCIPAL_PROP = "principal"; - private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient"; - private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient"; - private static final String IMPORT_INTERNAL_TOPICS = "atlas.kafka.bridge.enable.internal.topics.import"; + private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option"; + private static final String JAAS_PRINCIPAL_PROP = "principal"; + private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient"; + private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient"; + private static final String IMPORT_INTERNAL_TOPICS = "atlas.kafka.bridge.enable.internal.topics.import"; - public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka"; - - final protected Properties kafkaConfiguration; + public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka"; + public static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config"; + final protected Properties kafkaConfiguration; final protected AdminClient adminClient; - - final protected boolean importInternalTopics; + final protected boolean importInternalTopics; public KafkaUtils(Configuration atlasConfiguration) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("==> KafkaUtils() "); } + this.kafkaConfiguration = ApplicationProperties.getSubsetAsProperties(atlasConfiguration, ATLAS_KAFKA_PROPERTY_PREFIX); setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration); - adminClient = AdminClient.create(this.kafkaConfiguration); + + adminClient = AdminClient.create(this.kafkaConfiguration); importInternalTopics = atlasConfiguration.getBoolean(IMPORT_INTERNAL_TOPICS, false); - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("<== KafkaUtils() "); } } @@ -86,100 +86,143 @@ public class KafkaUtils implements AutoCloseable { public void createTopics(List<String> topicNames, int numPartitions, int replicationFactor) throws TopicExistsException, ExecutionException, InterruptedException { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("==> createTopics() "); } List<NewTopic> newTopicList = topicNames.stream() - .map(topicName -> new NewTopic(topicName, numPartitions, (short) replicationFactor)) - .collect(Collectors.toList()); + .map(topicName -> new NewTopic(topicName, numPartitions, (short) replicationFactor)) + .collect(Collectors.toList()); + + CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList); + Map<String, KafkaFuture<Void>> futureMap = createTopicsResult.values(); - CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList); - Map<String, KafkaFuture<Void>> futureMap = createTopicsResult.values(); - for(Map.Entry<String, KafkaFuture<Void>> futureEntry : futureMap.entrySet()) { - String topicName = futureEntry.getKey(); + for (Map.Entry<String, KafkaFuture<Void>> futureEntry : futureMap.entrySet()) { KafkaFuture<Void> future = futureEntry.getValue(); + future.get(); } - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("<== createTopics() "); } } public List<String> listAllTopics() throws ExecutionException, InterruptedException { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("==> KafkaUtils.listAllTopics() "); } + ListTopicsResult listTopicsResult = adminClient.listTopics((new ListTopicsOptions()).listInternal(importInternalTopics)); - List<String> topicNameList = new ArrayList<>(listTopicsResult.names().get()); + List<String> ret = new ArrayList<>(listTopicsResult.names().get()); - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("<== KafkaUtils.listAllTopics() "); } - return topicNameList; + return ret; } public Integer getPartitionCount(String topicName) throws ExecutionException, InterruptedException { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("==> KafkaUtils.getPartitionCount({})", topicName); } - Integer partitionCount = null; - DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName)); - Map<String, KafkaFuture<TopicDescription>> futureMap = describeTopicsResult.values(); - for(Map.Entry<String, KafkaFuture<TopicDescription>> futureEntry : futureMap.entrySet()) { - KafkaFuture<TopicDescription> topicDescriptionFuture = futureEntry.getValue(); - TopicDescription topicDescription = topicDescriptionFuture.get(); - List<TopicPartitionInfo> partitionList = topicDescription.partitions(); - partitionCount = partitionList.size(); + Integer ret = null; + List<TopicPartitionInfo> partitionList = getPartitionList(topicName); + + if (partitionList != null) { + ret = partitionList.size(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", topicName, ret); } - if(LOG.isDebugEnabled()) { - LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", topicName, partitionCount); + return ret; + } + + public Integer getReplicationFactor(String topicName) throws ExecutionException, InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> KafkaUtils.getReplicationFactor({})", topicName); + } + + Integer ret = null; + List<TopicPartitionInfo> partitionList = getPartitionList(topicName); + + if (partitionList != null) { + ret = partitionList.stream().mapToInt(x -> x.replicas().size()).max().getAsInt(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== KafkaUtils.getReplicationFactor returning for topic {} with replicationFactor {}", topicName, ret); + } + + return ret; + } + + private List<TopicPartitionInfo> getPartitionList(String topicName) throws ExecutionException, InterruptedException { + List<TopicPartitionInfo> ret = null; + DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName)); + + if (describeTopicsResult != null) { + Map<String, KafkaFuture<TopicDescription>> futureMap = describeTopicsResult.values(); + + for (Map.Entry<String, KafkaFuture<TopicDescription>> futureEntry : futureMap.entrySet()) { + KafkaFuture<TopicDescription> topicDescriptionFuture = futureEntry.getValue(); + TopicDescription topicDescription = topicDescriptionFuture.get(); + + ret = topicDescription.partitions(); + } } - return partitionCount; + return ret; } public void close() { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("==> KafkaUtils.close()"); } - if(adminClient != null) { + if (adminClient != null) { adminClient.close(); } - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("<== KafkaUtils.close()"); } } public static void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("==> KafkaUtils.setKafkaJAASProperties()"); } - if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) { + if (kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) { LOG.debug("JAAS config is already set, returning"); + return; } Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM); // JAAS Configuration is present then update set those properties in sasl.jaas.config - if(jaasConfig != null && !jaasConfig.isEmpty()) { + + if (jaasConfig != null && !jaasConfig.isEmpty()) { String jaasClientName = JAAS_DEFAULT_CLIENT_NAME; // Required for backward compatability for Hive CLI if (!isLoginKeytabBased() && isLoginTicketBased()) { - LOG.debug("Checking if ticketBased-KafkaClient is set"); + if (LOG.isDebugEnabled()) { + LOG.debug("Checking if ticketBased-KafkaClient is set"); + } + // if ticketBased-KafkaClient property is not specified then use the default client name String ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME; Configuration ticketBasedConfig = configuration.subset(ticketBasedConfigPrefix); - if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) { - LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it"); + if (ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it"); + } jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME; } else { @@ -193,24 +236,31 @@ public class KafkaUtils implements AutoCloseable { if (loginModuleName == null) { LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName); + return; } keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM; + String controlFlag = jaasConfig.getProperty(keyParam); - if(StringUtils.isEmpty(controlFlag)) { + if (StringUtils.isEmpty(controlFlag)) { String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS; + controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG; + LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues); } - String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + "."; - String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP; - int optionPrefixLen = optionPrefix.length(); + + String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + "."; + String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP; + int optionPrefixLen = optionPrefix.length(); StringBuffer optionStringBuffer = new StringBuffer(); + for (String key : jaasConfig.stringPropertyNames()) { if (key.startsWith(optionPrefix)) { String optionVal = jaasConfig.getProperty(key); + if (optionVal != null) { optionVal = optionVal.trim(); @@ -223,16 +273,18 @@ public class KafkaUtils implements AutoCloseable { } optionVal = surroundWithQuotes(optionVal); + optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal)); } } } String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString()); + kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty); } - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("<== KafkaUtils.setKafkaJAASProperties()"); } } @@ -262,17 +314,19 @@ public class KafkaUtils implements AutoCloseable { } static String surroundWithQuotes(String optionVal) { - if(StringUtils.isEmpty(optionVal)) { + if (StringUtils.isEmpty(optionVal)) { return optionVal; } + String ret = optionVal; // For property values which have special chars like "@" or "/", we need to enclose it in // double quotes, so that Kafka can parse it // If the property is already enclosed in double quotes, then do nothing. - if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') { + if (optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') { // If the string as special characters like except _,- final String SPECIAL_CHAR_LIST = "/!@#%^&*"; + if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) { ret = String.format("\"%s\"", optionVal); } @@ -280,5 +334,4 @@ public class KafkaUtils implements AutoCloseable { return ret; } - -} +} \ No newline at end of file