Repository: atlas
Updated Branches:
  refs/heads/master ce5ffeb71 -> fff94633d


ATLAS-2751: Atlas is not consuming messages from ATLAS_HOOK topic after 
recovering from zookeeper connection timeout.


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/fff94633
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/fff94633
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/fff94633

Branch: refs/heads/master
Commit: fff94633dcd4b80f903fdee0fb86ef021bbb5f5a
Parents: ce5ffeb
Author: Sarath Subramanian <[email protected]>
Authored: Sun Jun 10 16:55:10 2018 -0700
Committer: Sarath Subramanian <[email protected]>
Committed: Sun Jun 10 16:55:10 2018 -0700

----------------------------------------------------------------------
 .../apache/atlas/kafka/KafkaNotification.java   | 25 +++++++++++++++++---
 1 file changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/fff94633/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 00e56e3..624dc55 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -56,6 +56,8 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     public    static final String ATLAS_ENTITIES_TOPIC       = 
"ATLAS_ENTITIES";
     protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
 
+    private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This 
consumer has already been closed.";
+
     private static final Map<NotificationType, String> TOPIC_MAP = new 
HashMap<NotificationType, String>() {
         {
             put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
@@ -67,6 +69,7 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     private final Long          pollTimeOutMs;
     private       KafkaConsumer consumer;
     private       KafkaProducer producer;
+    private       String        consumerClosedErrorMsg;
 
     // ----- Constructors ----------------------------------------------------
 
@@ -85,8 +88,9 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
 
         Configuration kafkaConf = 
ApplicationProperties.getSubsetConfiguration(applicationProperties, 
PROPERTY_PREFIX);
 
-        properties    = ConfigurationConverter.getProperties(kafkaConf);
-        pollTimeOutMs = kafkaConf.getLong("poll.timeout.ms", 1000);
+        properties             = 
ConfigurationConverter.getProperties(kafkaConf);
+        pollTimeOutMs          = kafkaConf.getLong("poll.timeout.ms", 1000);
+        consumerClosedErrorMsg = 
kafkaConf.getString("error.message.consumer_closed", 
DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE);
 
         //Override default configs
         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
@@ -223,7 +227,7 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
 
 
     public KafkaConsumer getKafkaConsumer(Properties consumerProperties, 
NotificationType type, boolean autoCommitEnabled) {
-        if(this.consumer == null) {
+        if (consumer == null || !isKafkaConsumerOpen(consumer)) {
             try {
                 String topic = TOPIC_MAP.get(type);
 
@@ -287,4 +291,19 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
             return message;
         }
     }
+
+    // kafka-client doesn't have method to check if consumer is open, hence 
checking list topics and catching exception
+    private boolean isKafkaConsumerOpen(KafkaConsumer consumer) {
+        boolean ret = true;
+
+        try {
+            consumer.listTopics();
+        } catch (IllegalStateException ex) {
+            if (ex.getMessage().equalsIgnoreCase(consumerClosedErrorMsg)) {
+                ret = false;
+            }
+        }
+
+        return ret;
+    }
 }

Reply via email to