This is an automated email from the ASF dual-hosted git repository. nbonte pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit c3df22605795b9bddcba547852c04461ee9b8203 Author: Ashutosh Mestry <ames...@cloudera.com> AuthorDate: Thu May 20 15:07:27 2021 -0700 ATLAS-4152: Entity correlation for deleted entities. Signed-off-by: Nikhil Bonte <nbo...@apache.org> --- .../org/apache/atlas/repository/Constants.java | 2 + .../notification/AtlasNotificationMessage.java | 23 ++++-- .../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 3 +- .../org/apache/atlas/kafka/AtlasKafkaMessage.java | 17 ++++- .../org/apache/atlas/kafka/KafkaNotification.java | 12 ++++ .../AtlasNotificationMessageDeserializer.java | 17 ++++- .../atlas/notification/NotificationInterface.java | 9 +++ .../atlas/notification/spool/AtlasFileSpool.java | 30 ++++++-- .../apache/atlas/notification/spool/Publisher.java | 40 +++++++++-- .../notification/spool/SpoolConfiguration.java | 16 +++++ .../apache/atlas/notification/spool/Spooler.java | 17 ++++- .../notification/AbstractNotificationTest.java | 5 ++ .../notification/spool/AtlasFileSpoolTest.java | 5 ++ .../repository/graph/GraphBackedSearchIndexer.java | 1 + .../store/graph/EntityCorrelationStore.java | 53 ++++++++++++++ .../store/graph/v2/AtlasGraphUtilsV2.java | 21 ++++++ .../store/graph/v2/EntityCorrelationStoreTest.java | 83 ++++++++++++++++++++++ .../notification/EntityCorrelationManager.java | 60 ++++++++++++++++ .../notification/NotificationHookConsumer.java | 15 +++- .../preprocessor/EntityPreprocessor.java | 16 ++++- .../preprocessor/HiveDbDDLPreprocessor.java | 52 ++++++++++++++ .../preprocessor/HivePreprocessor.java | 28 ++++++++ .../preprocessor/HiveTableDDLPreprocessor.java | 52 ++++++++++++++ .../preprocessor/PreprocessorContext.java | 17 ++++- .../NotificationHookConsumerKafkaTest.java | 6 +- .../notification/NotificationHookConsumerTest.java | 22 +++--- 26 files changed, 579 insertions(+), 43 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index ffcec97..aea0c13 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -135,6 +135,8 @@ public final class Constants { public static final String TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "timestamp"); + public static final String ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityDeletedTimestamp"); + public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp"); public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete"); diff --git a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java index 810ba97..5869910 100644 --- a/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java +++ b/intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java @@ -40,9 +40,10 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ @XmlRootElement @XmlAccessorType(XmlAccessType.PROPERTY) public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { - private String msgSourceIP; - private String msgCreatedBy; - private long msgCreationTime; + private String msgSourceIP; + private String msgCreatedBy; + private long msgCreationTime; + private boolean spooled; /** * The actual message. @@ -55,18 +56,22 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { } public AtlasNotificationMessage(MessageVersion version, T message) { - this(version, message, null, null); + this(version, message, null, null, false); } - public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) { + public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy, boolean spooled) { super(version); this.msgSourceIP = msgSourceIP; this.msgCreatedBy = createdBy; this.msgCreationTime = (new Date()).getTime(); this.message = message; + this.spooled = spooled; } + public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) { + this(version, message, msgSourceIP, createdBy, false); + } public String getMsgSourceIP() { return msgSourceIP; @@ -99,4 +104,12 @@ public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { public void setMessage(T message) { this.message = message; } + + public void setSpooled(boolean val) { + this.spooled = val; + } + + public boolean getSpooled() { + return spooled; + } } diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index f7d9668..96dc585 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -134,7 +134,8 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { continue; } - messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition())); + messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(), + deserializer.getMsgCreated(), deserializer.getSpooled())); } } diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java index 22bd79f..af3727d 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java @@ -24,11 +24,19 @@ public class AtlasKafkaMessage<T> { private final T message; private final long offset; private final TopicPartition topicPartition; + private final boolean spooled; + private final long msgCreated; - public AtlasKafkaMessage(T message, long offset, String topic, int partition) { + public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled) { this.message = message; this.offset = offset; this.topicPartition = new TopicPartition(topic, partition); + this.msgCreated = msgCreated; + this.spooled = spooled; + } + + public AtlasKafkaMessage(T message, long offset, String topic, int partition) { + this(message, offset, topic, partition, 0, false); } public T getMessage() { @@ -51,4 +59,11 @@ public class AtlasKafkaMessage<T> { return topicPartition.partition(); } + public boolean getSpooled() { + return this.spooled; + } + + public long getMsgCreated() { + return this.msgCreated; + } } diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 3d1b3cc..32f5183 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -172,6 +172,18 @@ public class KafkaNotification extends AbstractNotification implements Service { // ----- NotificationInterface ------------------------------------------- + public boolean isReady(NotificationType notificationType) { + try { + KafkaProducer producer = getOrCreateProducer(notificationType); + producer.metrics(); + return true; + } + catch (Exception exception) { + LOG.error("Error: Connecting... {}", exception.getMessage()); + return false; + } + } + @Override public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) { return createConsumers(notificationType, numConsumers, Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false")))); diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java index 3264e26..b43bc7c 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java @@ -62,6 +62,8 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message private long splitMessagesLastPurgeTime = System.currentTimeMillis(); private final AtomicLong messageCountTotal = new AtomicLong(0); private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0); + private long msgCreated; + private boolean spooled; // ----- Constructors ---------------------------------------------------- /** @@ -101,18 +103,31 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message } // ----- MessageDeserializer --------------------------------------------- + public long getMsgCreated() { + return this.msgCreated; + } + + public boolean getSpooled() { + return this.spooled; + } + @Override public T deserialize(String messageJson) { final T ret; messageCountTotal.incrementAndGet(); messageCountSinceLastInterval.incrementAndGet(); + this.msgCreated = 0; + this.spooled = false; - AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class); + AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationMessage.class); if (msg == null || msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage ret = AtlasType.fromV1Json(messageJson, messageType); } else { + this.msgCreated = ((AtlasNotificationMessage) msg).getMsgCreationTime(); + this.spooled = ((AtlasNotificationMessage) msg).getSpooled(); + String msgJson = messageJson; if (msg.getMsgSplitCount() > 1) { // multi-part message diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index edd8ed9..3d8d9cc 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -110,4 +110,13 @@ public interface NotificationInterface { * Shutdown any notification producers and consumers associated with this interface instance. */ void close(); + + /** + * Check if underlying notification mechanism is ready for use. + * + * @param type tye message type + * @return true if available, false otherwise + * + */ + boolean isReady(NotificationType type); } diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java index 2d7d195..ea31284 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java @@ -27,6 +27,7 @@ import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -51,7 +52,7 @@ public class AtlasFileSpool implements NotificationInterface { @Override public void init(String source, Object failedMessagesLogger) { - LOG.info("==> AtlasFileSpool.init(source={})", source); + LOG.debug("==> AtlasFileSpool.init(source={})", source); if (!isInitDone()) { try { @@ -76,10 +77,10 @@ public class AtlasFileSpool implements NotificationInterface { LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", this.config.getSourceName(), t); } } else { - LOG.info("AtlasFileSpool.init(): initialization already done. initDone={}", initDone); + LOG.debug("AtlasFileSpool.init(): initialization already done. initDone={}", initDone); } - LOG.info("<== AtlasFileSpool.init(source={})", source); + LOG.debug("<== AtlasFileSpool.init(source={})", source); } @Override @@ -100,29 +101,35 @@ public class AtlasFileSpool implements NotificationInterface { } @Override + public boolean isReady(NotificationType type) { + return true; + } + + @Override public <T> void send(NotificationType type, List<T> messages) throws NotificationException { + List<String> serializedMessages = getSerializedMessages(messages); if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) { if (LOG.isDebugEnabled()) { LOG.debug("AtlasFileSpool.send(): sending to spooler"); } - spooler.send(type, messages); + spooler.sendInternal(type, serializedMessages); } else { if (LOG.isDebugEnabled()) { LOG.debug("AtlasFileSpool.send(): sending to notificationHandler"); } try { - notificationHandler.send(type, messages); + notificationHandler.sendInternal(type, serializedMessages); } catch (Exception e) { if (isInitDone()) { LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", e); publisher.setDestinationDown(); - spooler.send(type, messages); + spooler.sendInternal(type, serializedMessages); } else { - LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not yet initialized", e); + LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not initialized.", e); throw e; } @@ -130,6 +137,15 @@ public class AtlasFileSpool implements NotificationInterface { } } + private <T> List<String> getSerializedMessages(List<T> messages) { + List<String> serializedMessages = new ArrayList<>(messages.size()); + for (int index = 0; index < messages.size(); index++) { + notificationHandler.createNotificationMessages(messages.get(index), serializedMessages); + } + + return serializedMessages; + } + @Override public void close() { try { diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java index 22242c9..01ead7d 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java @@ -65,14 +65,16 @@ public class Publisher implements Runnable { IndexRecord record = null; while (true) { - waitIfDestinationDown(); - + checkAndWaitIfDestinationDown(); if (this.isDrain) { break; } - record = fetchNext(record); + if (isDestDown) { + continue; + } + record = fetchNext(record); if (record != null && processAndDispatch(record)) { indexManagement.removeAsDone(record); @@ -104,14 +106,14 @@ public class Publisher implements Runnable { return isDestDown; } - private void waitIfDestinationDown() throws InterruptedException { + private void checkAndWaitIfDestinationDown() throws InterruptedException { + isDestDown = !notificationHandler.isReady(NotificationInterface.NotificationType.HOOK); if (isDestDown) { LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items", this.source, notificationHandlerName, retryDestinationMS, indexManagement.getQueueSize()); Thread.sleep(retryDestinationMS); } - } private IndexRecord fetchNext(IndexRecord record) { @@ -147,7 +149,7 @@ public class Publisher implements Runnable { messages.add(message); - if ((isDestDown && messages.size() == 1) || messages.size() == messageBatchSize) { + if (messages.size() == messageBatchSize) { dispatch(record, lineInSpoolFile, messages); } } @@ -192,6 +194,8 @@ public class Publisher implements Runnable { private void dispatch(String filePath, List<String> messages) throws Exception { try { + pauseBeforeSend(); + notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, messages); if (isDestDown) { @@ -207,4 +211,28 @@ public class Publisher implements Runnable { messages.clear(); } } + + /** + * Reason for pauseBeforeSend: + * - EntityCorrelation is needed to be able to stitch lineage to the correct entity. + * - Background: When messages are added to Kafka queue directly, the ordering is incidentally guaranteed, where + * messages from lineage producing hooks reach immediately after messages from entities producing hooks. + * - When Spooled messages are posted onto Kafka, this order cannot be guaranteed. The entity correlation logic within Atlas + * can attach lineage to the correct entity, provided that the entity participating in the lineage is already present. + * + * This logic of entity correlation works well for majority of cases except where lineage entities are created before regular entities. + * In this case, shell entities get created in the absence of real entities. Problem is that there is 1 shell entity for any number of references. + * Circumventing this limitation is not easy. + * + * The pauseBeforeSend forces the situation where HiveMetaStore generated messages reach Kafka before lineage-producing hooks. + * + * @throws InterruptedException + */ + private void pauseBeforeSend() throws InterruptedException { + if (!configuration.isHiveMetaStore()) { + int waitMs = configuration.getPauseBeforeSendSec() * 1000; + LOG.info("Waiting before dispatch: {}", waitMs); + Thread.sleep(waitMs); + } + } } diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java index a9a3a78..74b8687 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java @@ -25,9 +25,11 @@ public class SpoolConfiguration { private static final int PROP_RETRY_DESTINATION_MS_DEFAULT = 30000; // Default 30 seconds private static final int PROP_FILE_ROLLOVER_SEC_DEFAULT = 60; // 60 secs private static final int PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT = 100; + private static final int PROP_PAUSE_BEFORE_SEND_MS_DEFAULT = 60; private static final String PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT = "archive"; private static final String PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT = "/tmp/spool"; private static final int PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT = 100; + private static final String PROP_HIVE_METASTORE_NAME_DEFAULT = "HiveMetastoreHookImpl"; private static final String PROPERTY_PREFIX_SPOOL = "atlas.hook.spool."; public static final String PROP_FILE_SPOOL_LOCAL_DIR = PROPERTY_PREFIX_SPOOL + "dir"; private static final String PROP_FILE_SPOOL_ARCHIVE_DIR = PROPERTY_PREFIX_SPOOL + "archive.dir"; @@ -35,6 +37,8 @@ public class SpoolConfiguration { public static final String PROP_FILE_SPOOL_FILE_ROLLOVER_SEC = PROPERTY_PREFIX_SPOOL + "file.rollover.sec"; public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = PROPERTY_PREFIX_SPOOL + "destination.retry.ms"; private static final String PROP_MESSAGE_BATCH_SIZE = PROPERTY_PREFIX_SPOOL + "destination.message.batchsize"; + private static final String PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC = PROPERTY_PREFIX_SPOOL + "pause.before.send.sec"; + private static final String PROP_HIVE_METASTORE_NAME = PROPERTY_PREFIX_SPOOL + "hivemetastore.name"; private final String messageHandlerName; private final int maxArchivedFilesCount; @@ -44,6 +48,8 @@ public class SpoolConfiguration { private final int fileSpoolMaxFilesCount; private final String spoolDirPath; private final String archiveDir; + private final int pauseBeforeSendSec; + private final String hiveMetaStoreName; private String sourceName; public SpoolConfiguration(Configuration cfg, String messageHandlerName) { @@ -51,10 +57,12 @@ public class SpoolConfiguration { this.maxArchivedFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT); this.messageBatchSize = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT); this.retryDestinationMS = cfg.getInt(PROP_FILE_SPOOL_DEST_RETRY_MS, PROP_RETRY_DESTINATION_MS_DEFAULT); + this.pauseBeforeSendSec = cfg.getInt(PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC, PROP_PAUSE_BEFORE_SEND_MS_DEFAULT); this.fileRollOverSec = cfg.getInt(PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, PROP_FILE_ROLLOVER_SEC_DEFAULT) * 1000; this.fileSpoolMaxFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT); this.spoolDirPath = cfg.getString(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT); this.archiveDir = cfg.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString()); + this.hiveMetaStoreName = cfg.getString(PROP_HIVE_METASTORE_NAME, PROP_HIVE_METASTORE_NAME_DEFAULT); } public void setSource(String val) { @@ -120,4 +128,12 @@ public class SpoolConfiguration { return new File(getSpoolDir(), fileDoneName); } + + public int getPauseBeforeSendSec() { + return pauseBeforeSendSec; + } + + public boolean isHiveMetaStore() { + return this.sourceName.equals(this.hiveMetaStoreName); + } } diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java index 2cacaaa..a918e9b 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java @@ -19,14 +19,14 @@ package org.apache.atlas.notification.spool; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.hook.FailedMessagesLogger; +import org.apache.atlas.model.notification.AtlasNotificationMessage; import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.NotificationConsumer; -import org.apache.commons.io.IOUtils; +import org.apache.atlas.type.AtlasType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.DataOutput; -import java.io.PrintWriter; import java.util.List; public class Spooler extends AbstractNotification { @@ -57,8 +57,14 @@ public class Spooler extends AbstractNotification { @Override public void sendInternal(NotificationType type, List<String> messages) { - boolean ret = write(messages); + for (int i = 0; i < messages.size(); i++) { + AtlasNotificationMessage e = AtlasType.fromV1Json(messages.get(i), AtlasNotificationMessage.class); + e.setSpooled(true); + + messages.set(i, AtlasType.toV1Json(e)); + } + boolean ret = write(messages); if (failedMessagesLogger != null && !ret) { writeToFailedMessages(messages); } @@ -68,6 +74,11 @@ public class Spooler extends AbstractNotification { public void close() { } + @Override + public boolean isReady(NotificationType type) { + return true; + } + @VisibleForTesting boolean write(List<String> messages) { final boolean ret; diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java index d7e4959..8078a6c 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java @@ -127,5 +127,10 @@ public class AbstractNotificationTest { @Override public void close() { } + + @Override + public boolean isReady(NotificationType type) { + return true; + } } } diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java index 167efbe..265598e 100644 --- a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java @@ -81,6 +81,11 @@ public class AtlasFileSpoolTest extends BaseTest { public void close() { } + + @Override + public boolean isReady(NotificationType type) { + return true; + } } @Test diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index cc727c6..ddfb008 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -339,6 +339,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, UniqueKind.NONE, Integer.class, SINGLE, true, true); createCommonVertexIndex(management, CUSTOM_ATTRIBUTES_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false); createCommonVertexIndex(management, LABELS_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false); + createCommonVertexIndex(management, ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, UniqueKind.NONE, Long.class, SINGLE, true, false); createCommonVertexIndex(management, PATCH_ID_PROPERTY_KEY, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); createCommonVertexIndex(management, PATCH_DESCRIPTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java new file mode 100644 index 0000000..4760757 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/EntityCorrelationStore.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph; + +import org.apache.atlas.annotation.GraphTransaction; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class EntityCorrelationStore { + private static final Logger LOG = LoggerFactory.getLogger(EntityCorrelationStore.class); + + public EntityCorrelationStore() { + } + + @GraphTransaction + public void add(String entityGuid, long messageTimestamp) { + AtlasVertex v = AtlasGraphUtilsV2.findByGuid(entityGuid); + if (v == null) { + LOG.warn("Fetching: {} did not yield result!", entityGuid); + return; + } + + AtlasGraphUtilsV2.setEncodedProperty(v, Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, messageTimestamp); + LOG.info("Updating: {}: {}", entityGuid, messageTimestamp); + } + + public String findCorrelatedGuid(String qualifiedName, long messageTimestamp) { + String guid = AtlasGraphUtilsV2.findFirstDeletedDuringSpooledByQualifiedName(qualifiedName, messageTimestamp); + + LOG.info("findCorrelatedGuid: {} - {} -> {}", qualifiedName, messageTimestamp, guid); + return guid; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java index 0a94708..e73f084 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java @@ -367,6 +367,27 @@ public class AtlasGraphUtilsV2 { return vertex; } + public static String findFirstDeletedDuringSpooledByQualifiedName(String qualifiedName, long timestamp) { + return findFirstDeletedDuringSpooledByQualifiedName(getGraphInstance(), qualifiedName, timestamp); + } + + public static String findFirstDeletedDuringSpooledByQualifiedName(AtlasGraph graph, String qualifiedName, long timestamp) { + MetricRecorder metric = RequestContext.get().startMetricRecord("findDeletedDuringSpooledByQualifiedName"); + + AtlasGraphQuery query = graph.query().has(STATE_PROPERTY_KEY, Status.DELETED.name()) + .has(Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.GREATER_THAN, timestamp) + .has(Constants.QUALIFIED_NAME, qualifiedName) + .orderBy(Constants.ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY, ASC); + + Iterator iterator = query.vertices().iterator(); + + String ret = iterator.hasNext() ? GraphHelper.getGuid((AtlasVertex) iterator.next()) : null; + + RequestContext.get().endMetricRecord(metric); + + return ret; + } + public static AtlasVertex findByGuid(String guid) { return findByGuid(getGraphInstance(), guid); } diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java new file mode 100644 index 0000000..a3be5f4 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/EntityCorrelationStoreTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.BasicTestSetup; +import org.apache.atlas.TestModules; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.EntityCorrelationStore; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.utils.TestResourceFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.io.IOException; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class EntityCorrelationStoreTest extends BasicTestSetup { + @Inject + AtlasGraph graph; + + @BeforeClass + public void setup() throws Exception { + super.initialize(); + + setupTestData(); + } + + @Test + public void verify() throws IOException, AtlasBaseException { + final String nonExistentQName = "db01@cm"; + final String db01QName = "db01x@cm"; + final EntityCorrelationStore entityCorrelationStore = new EntityCorrelationStore(); + + String db01 = TestResourceFileUtils.getJson("entities", "db01"); + + AtlasEntity.AtlasEntitiesWithExtInfo db = AtlasType.fromJson(db01, AtlasEntity.AtlasEntitiesWithExtInfo.class); + EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(db), false); + + String dbGuid = response.getFirstEntityCreated().getGuid(); + entityStore.deleteById(dbGuid); + + entityCorrelationStore.add(dbGuid,2L); + graph.commit(); + + String guid = entityCorrelationStore.findCorrelatedGuid(nonExistentQName, 1); + assertNull(guid); + + String fetchedGuid = entityCorrelationStore.findCorrelatedGuid(db01QName, 1L); + assertNotNull(fetchedGuid); + assertEquals(fetchedGuid, dbGuid); + + guid = entityCorrelationStore.findCorrelatedGuid(db01QName, 2L); + assertNull(guid); + } +} diff --git a/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java b/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java new file mode 100644 index 0000000..f1d6aff --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification; + +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.repository.store.graph.EntityCorrelationStore; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class EntityCorrelationManager { + private static final Logger LOG = LoggerFactory.getLogger(EntityCorrelationManager.class); + + private final EntityCorrelationStore entityCorrelationStore; + + public EntityCorrelationManager(EntityCorrelationStore entityCorrelationStore) { + this.entityCorrelationStore = entityCorrelationStore; + } + + public void add(boolean spooled, long spooledTimestamp, List<AtlasEntityHeader> entityHeaders) { + if (this.entityCorrelationStore == null || spooled == false || CollectionUtils.isEmpty(entityHeaders)) { + return; + } + + for (AtlasEntityHeader entityHeader : entityHeaders) { + String guid = entityHeader.getGuid(); + if (StringUtils.isNotEmpty(guid)) { + entityCorrelationStore.add(guid, spooledTimestamp); + } + } + } + + public String getGuidForDeletedEntityToBeCorrelated(String qualifiedName, long spooledMessageTimestamp) { + if (this.entityCorrelationStore == null || spooledMessageTimestamp <= 0) { + return null; + } + + String guid = entityCorrelationStore.findCorrelatedGuid(qualifiedName, spooledMessageTimestamp); + LOG.info("{}: spooledTimestamp: {} -> {}", qualifiedName, spooledMessageTimestamp, guid); + return guid; + } +} diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 84cc8d8..5643af9 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -40,6 +40,7 @@ import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.notification.preprocessor.EntityPreprocessor; import org.apache.atlas.notification.preprocessor.PreprocessorContext; import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; +import org.apache.atlas.repository.store.graph.EntityCorrelationStore; import org.apache.atlas.util.AtlasMetricsCounter; import org.apache.atlas.utils.AtlasJson; import org.apache.atlas.utils.LruCache; @@ -191,6 +192,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private ExecutorService executors; private Instant nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now()); private final Map<TopicPartition, Long> lastCommittedPartitionOffset; + private final EntityCorrelationManager entityCorrelationManager; @VisibleForTesting final int consumerRetryInterval; @@ -201,7 +203,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Inject public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, ServiceState serviceState, AtlasInstanceConverter instanceConverter, - AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) throws AtlasException { + AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil, + EntityCorrelationStore entityCorrelationStore) throws AtlasException { this.notificationInterface = notificationInterface; this.atlasEntityStore = atlasEntityStore; this.serviceState = serviceState; @@ -308,7 +311,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true); rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true); preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty(); - + entityCorrelationManager = new EntityCorrelationManager(entityCorrelationStore); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs); @@ -688,6 +691,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue())); stats.updateStats(response); + entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities()); } catch (ClassCastException cle) { LOG.error("Failed to delete entity {}", deleteRequest); } @@ -770,6 +774,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes()); stats.updateStats(response); + entityCorrelationManager.add(kafkaMsg.getSpooled(), kafkaMsg.getMsgCreated(), response.getDeletedEntities()); } } catch (ClassCastException cle) { LOG.error("Failed to do delete entities {}", entities); @@ -889,6 +894,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl RequestContext.get().resetEntityGuidUpdates(); + entityCorrelationManager.add(context.isSpooledMessage(), context.getMsgCreated(), response.getDeletedEntities()); + RequestContext.get().clearCache(); } } @@ -973,7 +980,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl PreprocessorContext context = null; if (preprocessEnabled) { - context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName); + context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, + hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, + rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName, entityCorrelationManager); if (context.isHivePreprocessEnabled()) { preprocessHiveTypes(context); diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java index 89568e2..7f0cafe 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java @@ -32,6 +32,8 @@ public abstract class EntityPreprocessor { public static final String TYPE_HIVE_PROCESS = "hive_process"; public static final String TYPE_HIVE_STORAGEDESC = "hive_storagedesc"; public static final String TYPE_HIVE_DB = "hive_db"; + public static final String TYPE_HIVE_DB_DDL = "hive_db_ddl"; + public static final String TYPE_HIVE_TABLE_DDL = "hive_table_ddl"; public static final String TYPE_HIVE_TABLE = "hive_table"; public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance"; public static final String TYPE_RDBMS_DB = "rdbms_db"; @@ -71,11 +73,13 @@ public abstract class EntityPreprocessor { static { EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] { new HivePreprocessor.HiveDbPreprocessor(), + new HiveDbDDLPreprocessor(), new HivePreprocessor.HiveTablePreprocessor(), new HivePreprocessor.HiveColumnPreprocessor(), new HivePreprocessor.HiveProcessPreprocessor(), new HivePreprocessor.HiveColumnLineageProcessPreprocessor(), - new HivePreprocessor.HiveStorageDescPreprocessor() + new HivePreprocessor.HiveStorageDescPreprocessor(), + new HiveTableDDLPreprocessor() }; EntityPreprocessor[] rdbmsPreprocessors = new EntityPreprocessor[] { @@ -158,6 +162,16 @@ public abstract class EntityPreprocessor { return ret != null ? ret.toString() : null; } + public void setObjectIdWithGuid(Object obj, String guid) { + if (obj instanceof AtlasObjectId) { + AtlasObjectId objectId = (AtlasObjectId) obj; + objectId.setGuid(guid); + } else if (obj instanceof Map) { + Map map = (Map) obj; + map.put("guid", guid); + } + } + protected boolean isEmpty(Object obj) { return obj == null || ((obj instanceof Collection) && ((Collection) obj).isEmpty()); } diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java new file mode 100644 index 0000000..dcff093 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveDbDDLPreprocessor extends EntityPreprocessor { + private static final Logger LOG = LoggerFactory.getLogger(HiveDbDDLPreprocessor.class); + + protected HiveDbDDLPreprocessor() { + super(TYPE_HIVE_DB_DDL); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (!context.isSpooledMessage()) { + return; + } + + Object dbObject = entity.getRelationshipAttribute(ATTRIBUTE_DB); + if (dbObject == null) { + return; + } + + String qualifiedName = getQualifiedName(dbObject); + String guid = context.getGuidForDeletedEntity(qualifiedName); + if (StringUtils.isEmpty(guid)) { + return; + } + + setObjectIdWithGuid(dbObject, guid); + LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid); + } +} diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java index 86e3384..bf6a623 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java @@ -228,6 +228,34 @@ public class HivePreprocessor { } } } + + preprocessCheckpoint(entity, context); + } + + private void preprocessCheckpoint(AtlasEntity entity, PreprocessorContext context) { + if (!context.isSpooledMessage()) { + return; + } + + String[] relationshipNames = new String[]{ATTRIBUTE_INPUTS, ATTRIBUTE_OUTPUTS}; + for (String relationshipName : relationshipNames) { + Object val = entity.getRelationshipAttribute(relationshipName); + if (!isEmpty(val) && val instanceof List) { + updateListWithGuids(context, (List) val); + } + } + } + + private void updateListWithGuids(PreprocessorContext context, List list) { + for (Object o : list) { + String qn = getQualifiedName(o); + String guid = context.getGuidForDeletedEntity(qn); + if (StringUtils.isEmpty(guid)) { + continue; + } + + setObjectIdWithGuid(o, guid); + } } private int getCollectionSize(Object obj) { diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java new file mode 100644 index 0000000..83d4d7c --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveTableDDLPreprocessor extends EntityPreprocessor { + private static final Logger LOG = LoggerFactory.getLogger(HiveTableDDLPreprocessor.class); + + protected HiveTableDDLPreprocessor() { + super(TYPE_HIVE_TABLE_DDL); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (!context.isSpooledMessage()) { + return; + } + + Object tableObject = entity.getRelationshipAttribute(ATTRIBUTE_TABLE); + if (tableObject == null) { + return; + } + + String qualifiedName = getQualifiedName(tableObject); + String guid = context.getGuidForDeletedEntity(qualifiedName); + if (StringUtils.isEmpty(guid)) { + return; + } + + setObjectIdWithGuid(tableObject, guid); + LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid); + } +} diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java index 608b4a3..59f6440 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java @@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasRelatedObjectId; import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.notification.EntityCorrelationManager; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.commons.collections.CollectionUtils; @@ -69,9 +70,10 @@ public class PreprocessorContext { private final Set<String> createdEntities = new HashSet<>(); private final Set<String> deletedEntities = new HashSet<>(); private final Map<String, String> guidAssignments = new HashMap<>(); + private final EntityCorrelationManager correlationManager; private List<AtlasEntity> postUpdateEntities = null; - public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName) { + public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName, EntityCorrelationMana [...] this.kafkaMessage = kafkaMessage; this.typeRegistry = typeRegistry; this.hiveTablesToIgnore = hiveTablesToIgnore; @@ -101,6 +103,7 @@ public class PreprocessorContext { } this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty() || updateHiveProcessNameWithQualifiedName; + this.correlationManager = correlationManager; } public AtlasKafkaMessage<HookNotification> getKafkaMessage() { @@ -577,4 +580,16 @@ public class PreprocessorContext { partialEntity.setAttribute(attrName, attrVal); } } + + public long getMsgCreated() { + return kafkaMessage.getMsgCreated(); + } + + public boolean isSpooledMessage() { + return kafkaMessage.getSpooled(); + } + + public String getGuidForDeletedEntity(String qualifiedName) { + return this.correlationManager.getGuidForDeletedEntityToBeCorrelated(qualifiedName, kafkaMessage.getMsgCreated()); + } } diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 65e8b50..fdfc256 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -109,7 +109,7 @@ public class NotificationHookConsumerKafkaTest { produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); @@ -128,7 +128,7 @@ public class NotificationHookConsumerKafkaTest { public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException { ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity())); @@ -159,7 +159,7 @@ public class NotificationHookConsumerKafkaTest { assertNotNull (consumer); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index 15a1900..f440c42 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -98,7 +98,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerCanProceedIfServerIsReady() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); @@ -111,7 +111,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); @@ -128,7 +128,7 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); EntityCreateRequest message = mock(EntityCreateRequest.class); @@ -145,7 +145,7 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class))); @@ -159,7 +159,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); @@ -179,7 +179,7 @@ public class NotificationHookConsumerTest { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); notificationHookConsumer.startInternal(configuration, executorService); verify(notificationInterface).createConsumers(NotificationType.HOOK, 1); @@ -197,7 +197,7 @@ public class NotificationHookConsumerTest { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); notificationHookConsumer.startInternal(configuration, executorService); @@ -216,7 +216,7 @@ public class NotificationHookConsumerTest { when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsActive(); @@ -236,7 +236,7 @@ public class NotificationHookConsumerTest { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); doAnswer(new Answer() { @Override @@ -267,7 +267,7 @@ public class NotificationHookConsumerTest { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); @@ -332,6 +332,6 @@ public class NotificationHookConsumerTest { when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException()); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); + return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil, null); } }