Repository: atlas Updated Branches: refs/heads/master ce5ffeb71 -> fff94633d
ATLAS-2751: Atlas is not consuming messages from ATLAS_HOOK topic after recovering from zookeeper connection timeout. Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/fff94633 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/fff94633 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/fff94633 Branch: refs/heads/master Commit: fff94633dcd4b80f903fdee0fb86ef021bbb5f5a Parents: ce5ffeb Author: Sarath Subramanian <[email protected]> Authored: Sun Jun 10 16:55:10 2018 -0700 Committer: Sarath Subramanian <[email protected]> Committed: Sun Jun 10 16:55:10 2018 -0700 ---------------------------------------------------------------------- .../apache/atlas/kafka/KafkaNotification.java | 25 +++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/fff94633/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 00e56e3..624dc55 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -56,6 +56,8 @@ public class KafkaNotification extends AbstractNotification implements Service { public static final String ATLAS_ENTITIES_TOPIC = "ATLAS_ENTITIES"; protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id"; + private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed."; + private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() { { put(NotificationType.HOOK, ATLAS_HOOK_TOPIC); @@ -67,6 +69,7 @@ public class KafkaNotification extends AbstractNotification implements Service { private final Long pollTimeOutMs; private KafkaConsumer consumer; private KafkaProducer producer; + private String consumerClosedErrorMsg; // ----- Constructors ---------------------------------------------------- @@ -85,8 +88,9 @@ public class KafkaNotification extends AbstractNotification implements Service { Configuration kafkaConf = ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX); - properties = ConfigurationConverter.getProperties(kafkaConf); - pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000); + properties = ConfigurationConverter.getProperties(kafkaConf); + pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000); + consumerClosedErrorMsg = kafkaConf.getString("error.message.consumer_closed", DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE); //Override default configs properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); @@ -223,7 +227,7 @@ public class KafkaNotification extends AbstractNotification implements Service { public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) { - if(this.consumer == null) { + if (consumer == null || !isKafkaConsumerOpen(consumer)) { try { String topic = TOPIC_MAP.get(type); @@ -287,4 +291,19 @@ public class KafkaNotification extends AbstractNotification implements Service { return message; } } + + // kafka-client doesn't have method to check if consumer is open, hence checking list topics and catching exception + private boolean isKafkaConsumerOpen(KafkaConsumer consumer) { + boolean ret = true; + + try { + consumer.listTopics(); + } catch (IllegalStateException ex) { + if (ex.getMessage().equalsIgnoreCase(consumerClosedErrorMsg)) { + ret = false; + } + } + + return ret; + } }
