This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new dbdfcd2 ATLAS-4155: NotificationHookConsumer: Fix for Large Message Processing Problem dbdfcd2 is described below commit dbdfcd27f264c6aa0bc6333ece5b992f9f985885 Author: Ashutosh Mestry <ames...@cloudera.com> AuthorDate: Wed Feb 17 21:17:30 2021 -0800 ATLAS-4155: NotificationHookConsumer: Fix for Large Message Processing Problem --- .../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 73 ++++++++++++++-------- .../atlas/notification/NotificationConsumer.java | 9 ++- .../AbstractNotificationConsumerTest.java | 6 ++ .../notification/NotificationHookConsumer.java | 50 +++------------ .../NotificationHookConsumerKafkaTest.java | 5 -- 5 files changed, 71 insertions(+), 72 deletions(-) diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index c38a504..f7d9668 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -20,12 +20,14 @@ package org.apache.atlas.kafka; import org.apache.atlas.notification.AbstractNotificationConsumer; import org.apache.atlas.notification.AtlasNotificationMessageDeserializer; import org.apache.atlas.notification.NotificationInterface; +import org.apache.commons.collections.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -63,7 +65,40 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { @Override public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) { + return receive(this.pollTimeoutMilliSeconds, null); + } + + @Override + public List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) { + return receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset); + } + + + @Override + public void commit(TopicPartition partition, long offset) { + if (!autoCommitEnabled) { + if (LOG.isDebugEnabled()) { + LOG.info(" commiting the offset ==>> " + offset); + } + kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset))); + } + } + + @Override + public void close() { + if (kafkaConsumer != null) { + kafkaConsumer.close(); + } + } + + @Override + public void wakeup() { + if (kafkaConsumer != null) { + kafkaConsumer.wakeup(); + } + } + private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) { List<AtlasKafkaMessage<T>> messages = new ArrayList(); ConsumerRecords<?, ?> records = kafkaConsumer != null ? kafkaConsumer.poll(timeoutMilliSeconds) : null; @@ -75,13 +110,24 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { record.topic(), record.partition(), record.offset(), record.key(), record.value()); } + TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); + if (MapUtils.isNotEmpty(lastCommittedPartitionOffset) + && lastCommittedPartitionOffset.containsKey(topicPartition) + && record.offset() < lastCommittedPartitionOffset.get(topicPartition)) { + + commit(topicPartition, record.offset()); + LOG.info("Skipping already processed message: topic={}, partition={} offset={}. Last processed offset={}", + record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition)); + continue; + } + T message = null; try { message = deserializer.deserialize(record.value().toString()); } catch (OutOfMemoryError excp) { LOG.error("Ignoring message that failed to deserialize: topic={}, partition={}, offset={}, key={}, value={}", - record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp); + record.topic(), record.partition(), record.offset(), record.key(), record.value(), excp); } if (message == null) { @@ -95,29 +141,4 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { return messages; } - - - @Override - public void commit(TopicPartition partition, long offset) { - if (!autoCommitEnabled) { - if (LOG.isDebugEnabled()) { - LOG.info(" commiting the offset ==>> " + offset); - } - kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset))); - } - } - - @Override - public void close() { - if (kafkaConsumer != null) { - kafkaConsumer.close(); - } - } - - @Override - public void wakeup() { - if (kafkaConsumer != null) { - kafkaConsumer.wakeup(); - } - } } diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java index f3e81ec..1fb9f99 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -18,6 +18,8 @@ package org.apache.atlas.notification; import java.util.List; +import java.util.Map; + import org.apache.kafka.common.TopicPartition; import org.apache.atlas.kafka.AtlasKafkaMessage; @@ -55,5 +57,10 @@ public interface NotificationConsumer<T> { List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds); - + /** + * Fetch data for the topics from Kafka, if lastCommittedOffset same as message + * received offset, it will proceed with commit. + * @return List containing kafka message and partionId and offset. + */ + List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset); } diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index 05d0d81..1b486e5 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -30,6 +30,7 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.mockito.Mockito.mock; @@ -223,6 +224,11 @@ public class AbstractNotificationConsumerTest { } return tempMessageList; } + + @Override + public List<AtlasKafkaMessage<TestMessage>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) { + return receive(); + } } public static class TestMessageDeserializer extends AbstractMessageDeserializer<TestMessage> { diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 0e58dac..84cc8d8 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -67,6 +67,7 @@ import org.apache.commons.collections.MapUtils; import org.apache.commons.collections4.map.PassiveExpiringMap; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.DependsOn; @@ -84,6 +85,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -188,6 +190,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final Configuration applicationProperties; private ExecutorService executors; private Instant nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now()); + private final Map<TopicPartition, Long> lastCommittedPartitionOffset; @VisibleForTesting final int consumerRetryInterval; @@ -205,7 +208,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl this.instanceConverter = instanceConverter; this.typeRegistry = typeRegistry; this.applicationProperties = ApplicationProperties.get(); - this.metricsUtil = metricsUtil; + this.metricsUtil = metricsUtil; + this.lastCommittedPartitionOffset = new HashMap<>(); maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1); @@ -517,14 +521,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final List<String> failedMessages = new ArrayList<>(); private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); - @VisibleForTesting - final FailedCommitOffsetRecorder failedCommitOffsetRecorder; - public HookConsumer(NotificationConsumer<HookNotification> consumer) { super("atlas-hook-consumer-thread", false); this.consumer = consumer; - failedCommitOffsetRecorder = new FailedCommitOffsetRecorder(); } @Override @@ -540,7 +540,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { while (shouldRun.get()) { try { - List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive(); + List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset); for (AtlasKafkaMessage<HookNotification> msg : messages) { handleMessage(msg); @@ -586,11 +586,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } try { - if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) { - commit(kafkaMsg); - return; - } - // covert V1 messages to V2 to enable preProcess try { switch (message.getType()) { @@ -919,16 +914,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) { - boolean commitSucceessStatus = false; - try { - recordFailedMessages(); - - consumer.commit(kafkaMessage.getTopicPartition(), kafkaMessage.getOffset() + 1); + recordFailedMessages(); - commitSucceessStatus = true; - } finally { - failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); - } + long commitOffset = kafkaMessage.getOffset() + 1; + lastCommittedPartitionOffset.put(kafkaMessage.getTopicPartition(), commitOffset); + consumer.commit(kafkaMessage.getTopicPartition(), commitOffset); } boolean serverAvailable(Timer timer) { @@ -1330,24 +1320,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return ret; } - - static class FailedCommitOffsetRecorder { - private Long currentOffset; - - public void recordIfFailed(boolean commitStatus, long offset) { - if(commitStatus) { - currentOffset = null; - } else { - currentOffset = offset; - } - } - - public boolean isMessageReplayed(long offset) { - return currentOffset != null && currentOffset == offset; - } - - public Long getCurrentOffset() { - return currentOffset; - } - } } diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 33191a7..65e8b50 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -130,7 +130,6 @@ public class NotificationHookConsumerKafkaTest { ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); - NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder; produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity())); @@ -143,16 +142,12 @@ public class NotificationHookConsumerKafkaTest { assertTrue(true, "ExceptionThrowing consumer throws an excepion."); } - assertTrue(failedCommitOffsetRecorder.getCurrentOffset() > -1); - consumer.disableCommitExpcetion(); produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); consumeOneMessage(consumer, hookConsumer); consumeOneMessage(consumer, hookConsumer); - assertNull(failedCommitOffsetRecorder.getCurrentOffset()); - reset(atlasEntityStore); }