This is an automated email from the ASF dual-hosted git repository. radhikakundam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 8d8169e83 ATLAS-4335: Hook Notifications through Rest Interface 8d8169e83 is described below commit 8d8169e83ac4acfb7b152594eac96ddbc9bb8437 Author: radhikakundam <radhikakun...@apache.org> AuthorDate: Tue Dec 6 11:23:22 2022 -0800 ATLAS-4335: Hook Notifications through Rest Interface Signed-off-by: radhikakundam <radhikakun...@apache.org> --- addons/falcon-bridge/pom.xml | 5 - addons/hbase-bridge/pom.xml | 8 +- addons/hive-bridge/pom.xml | 5 + addons/impala-bridge/pom.xml | 5 - addons/sqoop-bridge/pom.xml | 5 - .../org/apache/atlas/authorize/AtlasPrivilege.java | 4 +- .../main/java/org/apache/atlas/AtlasClientV2.java | 15 +- .../java/org/apache/atlas/AtlasConfiguration.java | 5 + .../main/java/org/apache/atlas/AtlasErrorCode.java | 4 +- notification/pom.xml | 4 +- .../main/java/org/apache/atlas/hook/AtlasHook.java | 6 + .../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 26 +++- .../org/apache/atlas/kafka/AtlasKafkaMessage.java | 18 ++- .../org/apache/atlas/kafka/KafkaNotification.java | 77 ++++++++++- .../apache/atlas/kafka/NotificationProvider.java | 31 +++-- .../AtlasNotificationMessageDeserializer.java | 7 + .../atlas/notification/NotificationConsumer.java | 7 + .../atlas/notification/NotificationInterface.java | 3 + .../atlas/notification/rest/RestNotification.java | 153 +++++++++++++++++++++ .../AbstractNotificationConsumerTest.java | 5 + .../atlas/notification/RestNotificationTest.java | 136 ++++++++++++++++++ .../notification/NotificationHookConsumer.java | 117 ++++++++++++++-- .../apache/atlas/web/rest/NotificationREST.java | 121 ++++++++++++++++ .../atlas/web/integration/NotificationRestIT.java | 73 ++++++++++ .../json/notifications/create-db-ddl.json | 1 + .../resources/json/notifications/create-db.json | 1 + .../resources/json/notifications/delete-db.json | 1 + 27 files changed, 789 insertions(+), 54 deletions(-) diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml index 1e2ce7c81..3576173b9 100644 --- a/addons/falcon-bridge/pom.xml +++ b/addons/falcon-bridge/pom.xml @@ -185,11 +185,6 @@ <artifactId>jersey-json</artifactId> <version>${jersey.version}</version> </artifactItem> - <artifactItem> - <groupId>javax.ws.rs</groupId> - <artifactId>jsr311-api</artifactId> - <version>${jsr.version}</version> - </artifactItem> </artifactItems> </configuration> </execution> diff --git a/addons/hbase-bridge/pom.xml b/addons/hbase-bridge/pom.xml index ca598ab39..d78abbb37 100644 --- a/addons/hbase-bridge/pom.xml +++ b/addons/hbase-bridge/pom.xml @@ -63,7 +63,7 @@ <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-bundle</artifactId> - <version>1.19</version> + <version>${jersey.version}</version> <scope>test</scope> </dependency> @@ -399,11 +399,6 @@ <artifactId>jersey-bundle</artifactId> <version>${jersey.version}</version> </artifactItem> - <artifactItem> - <groupId>javax.ws.rs</groupId> - <artifactId>jsr311-api</artifactId> - <version>${jsr.version}</version> - </artifactItem> </artifactItems> </configuration> </execution> @@ -621,7 +616,6 @@ </execution> </executions> </plugin> - </plugins> </build> </project> diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index 35bd4e553..3464b65f0 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -311,6 +311,11 @@ <artifactId>jersey-json</artifactId> <version>${jersey.version}</version> </artifactItem> + <artifactItem> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + <version>${jersey.version}</version> + </artifactItem> </artifactItems> </configuration> </execution> diff --git a/addons/impala-bridge/pom.xml b/addons/impala-bridge/pom.xml index a9759b36d..915dcbb4b 100644 --- a/addons/impala-bridge/pom.xml +++ b/addons/impala-bridge/pom.xml @@ -325,11 +325,6 @@ <artifactId>jersey-json</artifactId> <version>${jersey.version}</version> </artifactItem> - <artifactItem> - <groupId>javax.ws.rs</groupId> - <artifactId>jsr311-api</artifactId> - <version>${jsr.version}</version> - </artifactItem> </artifactItems> </configuration> </execution> diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml index e970c8011..4b6eac98f 100644 --- a/addons/sqoop-bridge/pom.xml +++ b/addons/sqoop-bridge/pom.xml @@ -249,11 +249,6 @@ <artifactId>jersey-json</artifactId> <version>${jersey.version}</version> </artifactItem> - <artifactItem> - <groupId>javax.ws.rs</groupId> - <artifactId>jsr311-api</artifactId> - <version>${jsr.version}</version> - </artifactItem> </artifactItems> </configuration> </execution> diff --git a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java index 5d06e1b29..f270844c5 100644 --- a/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java +++ b/authorization/src/main/java/org/apache/atlas/authorize/AtlasPrivilege.java @@ -46,7 +46,9 @@ public enum AtlasPrivilege { TYPE_READ("type-read"), - ADMIN_AUDITS("admin-audits"); + ADMIN_AUDITS("admin-audits"), + + SERVICE_NOTIFICATION_POST("service-notification-post"); private final String type; diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java index 6910b0e42..4970baaa9 100644 --- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java +++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java @@ -134,6 +134,9 @@ public class AtlasClientV2 extends AtlasBaseClient { private static final String GLOSSARY_CATEGORY = GLOSSARY_URI + "/category"; private static final String GLOSSARY_CATEGORIES = GLOSSARY_URI + "/categories"; + //Notification APIs + private static final String NOTIFICATION_URI = BASE_URI + "v2/notification"; + public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) { super(baseUrl, basicAuthUserNamePassword); @@ -173,7 +176,7 @@ public class AtlasClientV2 extends AtlasBaseClient { } @VisibleForTesting - AtlasClientV2(WebResource service, Configuration configuration) { + public AtlasClientV2(WebResource service, Configuration configuration) { super(service, configuration); } @@ -1024,6 +1027,14 @@ public class AtlasClientV2 extends AtlasBaseClient { return callAPI(API_V2.IMPORT_GLOSSARY, BulkImportResponse.class, multipartEntity); } + public void postNotificationToTopic(String topic, List<String> messages) throws AtlasServiceException { + callAPI(formatPathParameters(API_V2.POST_NOTIFICATIONS_TO_TOPIC, topic), (Class<?>) null, messages); + } + + @VisibleForTesting + public API formatPathWithParameter(API api, String... params) { + return formatPathParameters(api, params); + } @Override protected API formatPathParameters(API api, String... params) { @@ -1199,6 +1210,8 @@ public class AtlasClientV2 extends AtlasBaseClient { public static final API_V2 GET_BUSINESS_METADATA_TEMPLATE = new API_V2(ENTITY_API + "businessmetadata/import/template", HttpMethod.GET, Response.Status.OK, MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM); public static final API_V2 IMPORT_BUSINESS_METADATA = new API_V2(ENTITY_API + "businessmetadata/import", HttpMethod.POST, Response.Status.OK, MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON); + public static final API_V2 POST_NOTIFICATIONS_TO_TOPIC = new API_V2(NOTIFICATION_URI + "/topic/%s", HttpMethod.POST, Response.Status.NO_CONTENT); + // labels APIs public static final API_V2 ADD_LABELS = new API_V2(ENTITY_API + "guid/%s/labels", HttpMethod.PUT, Response.Status.NO_CONTENT); public static final API_V2 ADD_LABELS_BY_UNIQUE_ATTRIBUTE = new API_V2(ENTITY_API + "uniqueAttribute/type/%s/labels", HttpMethod.PUT, Response.Status.NO_CONTENT); diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index e8c7a15ea..df886753f 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -40,9 +40,14 @@ public enum AtlasConfiguration { NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"), NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"), + NOTIFICATION_HOOK_CONSUMER_BUFFERING_INTERVAL("atlas.notification.consumer.message.buffering.interval.seconds", 15), + NOTIFICATION_HOOK_CONSUMER_BUFFERING_BATCH_SIZE("atlas.notification.consumer.message.buffering.batch.size", 100), + + NOTIFICATION_HOOK_REST_ENABLED("atlas.hook.rest.notification.enabled", false), NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names", "ATLAS_HOOK"), // a comma separated list of topic names NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names", "ATLAS_ENTITIES"), // a comma separated list of topic names + NOTIFICATION_REST_BODY_MAX_LENGTH_BYTES("atlas.notification.rest.body.max.length.bytes", (1 * 1024 * 1024)), NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)), NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true), NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60), diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 21ac7f78e..77a6fd8c3 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -176,6 +176,7 @@ public enum AtlasErrorCode { ATTRIBUTE_NAME_ALREADY_EXISTS_IN_ANOTHER_PARENT_TYPE(400, "ATLAS-400-00-09E", "Invalid attribute name: {0}.{1}. Attribute already exists in another parent type: {2}"), IMPORT_INVALID_ZIP_ENTRY(400, "ATLAS-400-00-09F", "{0}: invalid zip entry. Reason: {1}"), LINEAGE_ON_DEMAND_NOT_ENABLED(400, "ATLAS-400-00-100", "Lineage on demand config: {0} is not enabled"), + INVALID_TOPIC_NAME(400, "ATLAS-400-00-101", "Unsupported topic name : {0}"), UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"), @@ -243,7 +244,8 @@ public enum AtlasErrorCode { ENTITY_NOTIFICATION_FAILED(500, "ATLAS-500-00-014", "Notification failed for operation: {0} : {1}"), FAILED_TO_UPLOAD(500, "ATLAS-500-00-015", "Error occurred while uploading the file: {0}"), FAILED_TO_CREATE_GLOSSARY_TERM(500, "ATLAS-500-00-016", "Error occurred while creating glossary term: {0}"), - FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred while updating glossary term: {0}"); + FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred while updating glossary term: {0}"), + NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}"); private String errorCode; private String errorMessage; diff --git a/notification/pom.xml b/notification/pom.xml index aaf11c7a4..4e5d9a835 100644 --- a/notification/pom.xml +++ b/notification/pom.xml @@ -42,7 +42,9 @@ <dependency> <groupId>org.apache.atlas</groupId> - <artifactId>atlas-server-api</artifactId> + <artifactId>atlas-client-v2</artifactId> + <version>${project.version}</version> + <scope>provided</scope> </dependency> <dependency> diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 24ea6ea83..4c70aedb9 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -21,6 +21,7 @@ package org.apache.atlas.hook; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasConstants; import org.apache.atlas.kafka.NotificationProvider; import org.apache.atlas.model.notification.HookNotification; @@ -63,6 +64,7 @@ public abstract class AtlasHook { public static final String CONF_METADATA_NAMESPACE = "atlas.metadata.namespace"; public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; public static final String DEFAULT_CLUSTER_NAME = "primary"; + public static final String CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED = "atlas.hook.messages.sort.enabled"; protected static Configuration atlasProperties; protected static NotificationInterface notificationInterface; @@ -75,6 +77,8 @@ public abstract class AtlasHook { private static final int notificationMaxRetries; private static final int notificationRetryInterval; private static ExecutorService executor = null; + public static final boolean isRESTNotificationEnabled; + public static final boolean isHookMsgsSortEnabled; static { @@ -95,6 +99,8 @@ public abstract class AtlasHook { failedMessagesLogger = null; } + isRESTNotificationEnabled = AtlasConfiguration.NOTIFICATION_HOOK_REST_ENABLED.getBoolean(); + isHookMsgsSortEnabled = atlasProperties.getBoolean(CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED, isRESTNotificationEnabled); metadataNamespace = getMetadataNamespace(atlasProperties); notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3); notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000); diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index 96dc5856a..89ec59caa 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -73,6 +73,11 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { return receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset); } + @Override + public List<AtlasKafkaMessage<T>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) { + return receiveRawRecords(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset); + } + @Override public void commit(TopicPartition partition, long offset) { @@ -98,7 +103,15 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { } } + private List<AtlasKafkaMessage<T>> receiveRawRecords(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) { + return receive(timeoutMilliSeconds, lastCommittedPartitionOffset, true); + } + private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) { + return receive(timeoutMilliSeconds, lastCommittedPartitionOffset, false); + } + + private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset, boolean isRawDataRequired) { List<AtlasKafkaMessage<T>> messages = new ArrayList(); ConsumerRecords<?, ?> records = kafkaConsumer != null ? kafkaConsumer.poll(timeoutMilliSeconds) : null; @@ -134,8 +147,17 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { continue; } - messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(), - deserializer.getMsgCreated(), deserializer.getSpooled())); + AtlasKafkaMessage kafkaMessage = null; + + if (isRawDataRequired) { + kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(), + deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource(), record.value().toString()); + } else { + kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(), + deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource()); + } + + messages.add(kafkaMessage); } } diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java index af3727df4..390eca7ba 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java @@ -26,13 +26,25 @@ public class AtlasKafkaMessage<T> { private final TopicPartition topicPartition; private final boolean spooled; private final long msgCreated; + private final String source; + private final String rawRecordData; - public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled) { + public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled, String source, String rawRecordData) { this.message = message; this.offset = offset; this.topicPartition = new TopicPartition(topic, partition); this.msgCreated = msgCreated; this.spooled = spooled; + this.source = source; + this.rawRecordData = rawRecordData; + } + + public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled, String source) { + this(message, offset, topic, partition, msgCreated, spooled, source, null); + } + + public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled) { + this(message, offset, topic, partition, msgCreated, spooled, null); } public AtlasKafkaMessage(T message, long offset, String topic, int partition) { @@ -66,4 +78,8 @@ public class AtlasKafkaMessage<T> { public long getMsgCreated() { return this.msgCreated; } + + public String getSource() { return this.source; } + + public String getRawRecordData() { return this.rawRecordData; } } diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 32f5183a0..870d50814 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -58,6 +58,7 @@ public class KafkaNotification extends AbstractNotification implements Service { public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class); public static final String PROPERTY_PREFIX = "atlas.kafka"; + public static final String UNSORTED_POSTFIX = "_UNSORTED"; public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString(); protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id"; @@ -67,9 +68,27 @@ public class KafkaNotification extends AbstractNotification implements Service { private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed."; + public static String ATLAS_HOOK_TOPIC_UNSORTED; + public static String[] ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS; + + static { + try { + ATLAS_HOOK_TOPIC_UNSORTED = ATLAS_HOOK_TOPIC + UNSORTED_POSTFIX; + ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS = ATLAS_HOOK_CONSUMER_TOPICS != null && ATLAS_HOOK_CONSUMER_TOPICS.length > 0 + ? new String[ATLAS_HOOK_CONSUMER_TOPICS.length] : new String[] {ATLAS_HOOK_TOPIC_UNSORTED}; + + for (int i = 0; i < ATLAS_HOOK_CONSUMER_TOPICS.length; i++) { + ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS[i] = ATLAS_HOOK_CONSUMER_TOPICS[i] + UNSORTED_POSTFIX; + } + } catch (Exception e) { + LOG.error("Error while initializing Kafka Notification", e); + } + } + private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() { { put(NotificationType.HOOK, ATLAS_HOOK_TOPIC); + put(NotificationType.HOOK_UNSORTED, ATLAS_HOOK_TOPIC_UNSORTED); put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC); } }; @@ -77,6 +96,7 @@ public class KafkaNotification extends AbstractNotification implements Service { private static final Map<NotificationType, String[]> CONSUMER_TOPICS_MAP = new HashMap<NotificationType, String[]>() { { put(NotificationType.HOOK, trimAndPurge(ATLAS_HOOK_CONSUMER_TOPICS)); + put(NotificationType.HOOK_UNSORTED, trimAndPurge(ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS)); put(NotificationType.ENTITIES, trimAndPurge(ATLAS_ENTITIES_CONSUMER_TOPICS)); } }; @@ -86,6 +106,7 @@ public class KafkaNotification extends AbstractNotification implements Service { private final Map<NotificationType, List<KafkaConsumer>> consumers = new HashMap<>(); private final Map<NotificationType, KafkaProducer> producers = new HashMap<>(); private String consumerClosedErrorMsg; + private final Map<String, KafkaProducer> producersByTopic = new HashMap<>(); // ----- Constructors ---------------------------------------------------- @@ -255,6 +276,21 @@ public class KafkaNotification extends AbstractNotification implements Service { LOG.info("<== KafkaNotification.close()"); } + //Sending messages received through HTTP or REST Notification Service to Producer + public void sendInternal(String topic, List<String> messages, boolean isSortNeeded) throws NotificationException { + KafkaProducer producer; + if (isSortNeeded) { + topic = topic + UNSORTED_POSTFIX; + } + producer = getOrCreateProducer(topic); + sendInternalToProducer(producer, topic, messages); + } + + public void sendInternal(String topic, List<String> messages) throws NotificationException { + KafkaProducer producer = getOrCreateProducer(topic); + + sendInternalToProducer(producer, topic, messages); + } // ----- AbstractNotification -------------------------------------------- @Override @@ -266,7 +302,11 @@ public class KafkaNotification extends AbstractNotification implements Service { @VisibleForTesting void sendInternalToProducer(Producer p, NotificationType notificationType, List<String> messages) throws NotificationException { - String topic = PRODUCER_TOPIC_MAP.get(notificationType); + String topic = PRODUCER_TOPIC_MAP.get(notificationType); + sendInternalToProducer(p, topic, messages); + } + + void sendInternalToProducer(Producer p, String topic , List<String> messages) throws NotificationException { List<MessageContext> messageContexts = new ArrayList<>(); for (String message : messages) { @@ -308,6 +348,9 @@ public class KafkaNotification extends AbstractNotification implements Service { public Properties getConsumerProperties(NotificationType notificationType) { // find the configured group id for the given notification type String groupId = properties.getProperty(notificationType.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); + if (StringUtils.isEmpty(groupId)) { + groupId = "atlas"; + } if (StringUtils.isEmpty(groupId)) { throw new IllegalStateException("No configuration group id set for the notification type " + notificationType); @@ -346,21 +389,45 @@ public class KafkaNotification extends AbstractNotification implements Service { private KafkaProducer getOrCreateProducer(NotificationType notificationType) { LOG.debug("==> KafkaNotification.getOrCreateProducer()"); - KafkaProducer ret = producers.get(notificationType); + KafkaProducer ret = getOrCreateProducerByCriteria(notificationType, producers, false); + + LOG.debug("<== KafkaNotification.getOrCreateProducer()"); + + return ret; + } + + private KafkaProducer getOrCreateProducer(String topic) { + LOG.debug("==> KafkaNotification.getOrCreateProducer() by Topic"); + + KafkaProducer ret = getOrCreateProducerByCriteria(topic, producersByTopic, true); + + LOG.debug("<== KafkaNotification.getOrCreateProducer by Topic"); + + return ret; + } + + private KafkaProducer getOrCreateProducerByCriteria(Object producerCriteria, Map producersByCriteria, boolean fetchByTopic) { + LOG.debug("==> KafkaNotification.getOrCreateProducerByCriteria()"); + + if ((fetchByTopic && !(producerCriteria instanceof String)) || (!fetchByTopic && !(producerCriteria instanceof NotificationType))) { + LOG.error("Error while retrieving Producer due to invalid criteria"); + } + + KafkaProducer ret = (KafkaProducer) producersByCriteria.get(producerCriteria); if (ret == null) { synchronized (this) { - ret = producers.get(notificationType); + ret = (KafkaProducer) producersByCriteria.get(producerCriteria); if (ret == null) { ret = new KafkaProducer(properties); - producers.put(notificationType, ret); + producersByCriteria.put(producerCriteria, ret); } } } - LOG.debug("<== KafkaNotification.getOrCreateProducer()"); + LOG.debug("<== KafkaNotification.getOrCreateProducerByCriteria()"); return ret; } diff --git a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java index b35af97fd..9d8686257 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java +++ b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java @@ -17,26 +17,28 @@ */ package org.apache.atlas.kafka; +import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; -import org.apache.atlas.notification.LogConfigUtils; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.notification.rest.RestNotification; import org.apache.atlas.notification.spool.AtlasFileSpool; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; - /** * Provider class for Notification interfaces */ public class NotificationProvider { private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class); - private static final String CONF_ATLAS_HOOK_SPOOL_ENABLED = "atlas.hook.spool.enabled"; - private static final String CONF_ATLAS_HOOK_SPOOL_DIR = "atlas.hook.spool.dir"; + @VisibleForTesting + public static final String CONF_ATLAS_HOOK_SPOOL_ENABLED = "atlas.hook.spool.enabled"; + private static final String CONF_ATLAS_HOOK_SPOOL_DIR = "atlas.hook.spool.dir"; private static final boolean CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT = false; @@ -45,25 +47,32 @@ public class NotificationProvider { public static NotificationInterface get() { if (notificationProvider == null) { try { - Configuration conf = ApplicationProperties.get(); - KafkaNotification kafka = new KafkaNotification(conf); - String spoolDir = getSpoolDir(conf); + Configuration conf = ApplicationProperties.get(); + String spoolDir = getSpoolDir(conf); + AbstractNotification absNotifier = null; + + if (AtlasHook.isRESTNotificationEnabled) { + absNotifier = new RestNotification(conf); + } else { + absNotifier = new KafkaNotification(conf); + } if (isSpoolingEnabled(conf) && StringUtils.isNotEmpty(spoolDir)) { LOG.info("Notification spooling is enabled: spool directory={}", spoolDir); conf.setProperty(CONF_ATLAS_HOOK_SPOOL_DIR, spoolDir); - notificationProvider = new AtlasFileSpool(conf, kafka); + notificationProvider = new AtlasFileSpool(conf, absNotifier); } else { LOG.info("Notification spooling is not enabled"); - notificationProvider = kafka; + notificationProvider = absNotifier; } } catch (AtlasException e) { - throw new RuntimeException(e); + throw new RuntimeException("Error while initializing Notification interface", e); } } + LOG.debug("NotificationInterface of type {} is enabled", notificationProvider.getClass().getSimpleName()); return notificationProvider; } diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java index 207747d7d..3048b9c95 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java @@ -65,6 +65,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0); private long msgCreated; private boolean spooled; + private String source; // ----- Constructors ---------------------------------------------------- /** @@ -112,6 +113,10 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message return this.spooled; } + public String getSource() { + return this.source; + } + @Override public T deserialize(String messageJson) { final T ret; @@ -120,6 +125,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message messageCountSinceLastInterval.incrementAndGet(); this.msgCreated = 0; this.spooled = false; + this.source = null; AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationMessage.class); @@ -128,6 +134,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message } else { this.msgCreated = ((AtlasNotificationMessage) msg).getMsgCreationTime(); this.spooled = ((AtlasNotificationMessage) msg).getSpooled(); + this.source = msg.getSource() != null ? msg.getSource().getSource() : null; String msgJson = messageJson; diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java index 1fb9f9989..83af92b64 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -63,4 +63,11 @@ public interface NotificationConsumer<T> { * @return List containing kafka message and partionId and offset. */ List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset); + + /** + * Fetch raw data for the topics from Kafka, if lastCommittedOffset same as message + * received offset, it will proceed with commit. + * @return List containing kafka message and partitionId and offset. + */ + List<AtlasKafkaMessage<T>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset); } diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index a9cd4a6bb..3fb616edb 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -46,6 +46,9 @@ public interface NotificationInterface { // Notifications from the Atlas integration hooks. HOOK(new HookMessageDeserializer()), + // Notifications from the Atlas integration hooks - unsorted. + HOOK_UNSORTED(new HookMessageDeserializer()), + // Notifications to entity change consumers. ENTITIES(new EntityMessageDeserializer()); diff --git a/notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java b/notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java new file mode 100644 index 000000000..fb598f899 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.notification.rest; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.notification.AtlasNotificationBaseMessage; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationException; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +import static org.apache.atlas.kafka.KafkaNotification.ATLAS_ENTITIES_TOPIC; +import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC; + +public class RestNotification extends AbstractNotification { + + private static final Logger LOG = LoggerFactory.getLogger(RestNotification.class); + private static final int BATCH_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_REST_BODY_MAX_LENGTH_BYTES.getInt(); + private static final String ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String BASIC_AUTH_USERNAME = "atlas.rest.basic.auth.username"; + private static final String BASIC_AUTH_PASSWORD = "atlas.rest.basic.auth.password"; + private static final String DEFAULT_ATLAS_URL = "http://localhost:31000/"; + + private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() { + { + put(NotificationType.HOOK, ATLAS_HOOK_TOPIC); + put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC); + } + }; + + @VisibleForTesting + public AtlasClientV2 atlasClientV2; + + public RestNotification(Configuration configuration) throws AtlasException { + super(); + setupAtlasClientV2(configuration); + } + + private AtlasClientV2 setupAtlasClientV2(Configuration configuration) throws AtlasException { + if (atlasClientV2 != null) { + return atlasClientV2; + } + try { + String[] atlasEndPoint = configuration.getStringArray(ATLAS_ENDPOINT); + + if (atlasEndPoint == null || atlasEndPoint.length == 0) { + atlasEndPoint = new String[] { DEFAULT_ATLAS_URL }; + } + + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + String fileAuthUsername = configuration.getString(BASIC_AUTH_USERNAME, "admin"); + String fileAuthPassword = configuration.getString(BASIC_AUTH_PASSWORD, "admin123"); + String[] basicAuthUsernamePassword = (fileAuthUsername == null || fileAuthPassword == null) + ? AuthenticationUtil.getBasicAuthenticationInput() + : new String[]{fileAuthUsername, fileAuthPassword}; + + atlasClientV2 = new AtlasClientV2(atlasEndPoint, basicAuthUsernamePassword); + } else { + atlasClientV2 = new AtlasClientV2(atlasEndPoint); + } + } catch (AtlasException e) { + throw new AtlasException(e); + } + + return atlasClientV2; + } + + @Override + public void sendInternal(NotificationType type, List<String> messages) throws NotificationException { + String topic = PRODUCER_TOPIC_MAP.get(type); + List<List<String>> batches = getBatches(messages); + + int batchCounter = 0; + try { + for (List<String> batch: batches) { + batchCounter++; + atlasClientV2.postNotificationToTopic(topic, batch); + } + } catch (AtlasServiceException e) { + if (e.getMessage().contains(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode())) { + LOG.error("Sending notifications through REST interface failed starting from batch# {}", batchCounter); + throw new NotificationException(e); + } else { + throw new RuntimeException(e); + } + } + } + + private List<List<String>> getBatches(List<String> messages) { + List<List<String>> batches = new ArrayList(); + List<String> batch = new ArrayList(); + int batchSize = 0; + + for (String message : messages) { + byte[] msgBytes = AtlasNotificationBaseMessage.getBytesUtf8(message); + + if (batchSize > 0 && batchSize + msgBytes.length > BATCH_MAX_LENGTH_BYTES) { + batches.add(batch); + + batch = new ArrayList(); + batchSize = 0; + } + batch.add(message); + batchSize += msgBytes.length; + } + batches.add(batch); + return batches; + } + + @Override + public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) { + return null; + } + + @Override + public void close() { + + } + + @Override + public boolean isReady(NotificationType type) { + return true; + } +} diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index 1b486e528..aee59a395 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -229,6 +229,11 @@ public class AbstractNotificationConsumerTest { public List<AtlasKafkaMessage<TestMessage>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) { return receive(); } + + @Override + public List<AtlasKafkaMessage<TestMessage>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) { + return null; + } } public static class TestMessageDeserializer extends AbstractMessageDeserializer<TestMessage> { diff --git a/notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java new file mode 100644 index 000000000..476518df6 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/RestNotificationTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasBaseClient; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.kafka.NotificationProvider; +import org.apache.atlas.notification.rest.RestNotification; +import org.apache.commons.configuration.Configuration; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.util.ArrayList; +import java.util.Arrays; + +import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +public class RestNotificationTest { + + private NotificationInterface notifier; + private Configuration conf; + + @Mock + private WebResource service; + + @Mock + private WebResource.Builder resourceBuilderMock; + + @BeforeClass + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + + conf = ApplicationProperties.get(); + + conf.setProperty(AtlasConfiguration.NOTIFICATION_HOOK_REST_ENABLED.getPropertyName(), true); + conf.setProperty(NotificationProvider.CONF_ATLAS_HOOK_SPOOL_ENABLED, false); + + notifier = NotificationProvider.get(); + + } + + private WebResource.Builder setupBuilder(AtlasClientV2.API api, WebResource webResource) { + when(webResource.path(api.getPath())).thenReturn(service); + when(webResource.path(api.getNormalizedPath())).thenReturn(service); + + return getBuilder(service); + } + + private WebResource.Builder getBuilder(WebResource resourceObject) { + when(resourceObject.getRequestBuilder()).thenReturn(resourceBuilderMock); + when(resourceObject.path(anyString())).thenReturn(resourceObject); + when(resourceBuilderMock.accept(MediaType.APPLICATION_JSON)).thenReturn(resourceBuilderMock); + when(resourceBuilderMock.type(MediaType.MULTIPART_FORM_DATA)).thenReturn(resourceBuilderMock); + when(resourceBuilderMock.type(MediaType.APPLICATION_JSON + "; charset=UTF-8")).thenReturn(resourceBuilderMock); + + return resourceBuilderMock; + } + + @Test + public void testNotificationProvider () throws Exception { + assertEquals(notifier.getClass(), RestNotification.class); + } + + @Test + public void testPostNotificationToTopic () throws Exception { + AtlasClientV2 client = new AtlasClientV2(service, conf); + AtlasBaseClient.API api = client.formatPathWithParameter(AtlasClientV2.API_V2.POST_NOTIFICATIONS_TO_TOPIC, ATLAS_HOOK_TOPIC); + WebResource.Builder builder = setupBuilder(api, service); + ClientResponse response = mock(ClientResponse.class); + + when(response.getStatus()).thenReturn(Response.Status.NO_CONTENT.getStatusCode()); + when(builder.method(anyString(), Matchers.<Class>any(), anyList())).thenReturn(response); + + ((RestNotification)notifier).atlasClientV2 = client; + + try { + ((RestNotification)notifier).sendInternal(NotificationInterface.NotificationType.HOOK, new ArrayList<String>(Arrays.asList("Dummy"))); + } catch (NotificationException e) { + Assert.fail("Failed with Exception"); + } + } + + @Test + public void testNotificationException () throws Exception { + AtlasClientV2 client = new AtlasClientV2(service, conf); + AtlasBaseClient.API api = client.formatPathWithParameter(AtlasClientV2.API_V2.POST_NOTIFICATIONS_TO_TOPIC, ATLAS_HOOK_TOPIC); + WebResource.Builder builder = setupBuilder(api, service); + ClientResponse response = mock(ClientResponse.class); + + when(response.getStatus()).thenReturn(AtlasErrorCode.NOTIFICATION_EXCEPTION.getHttpCode().getStatusCode()); + when(response.getEntity(String.class)).thenReturn(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode()); + when(builder.method(anyString(), Matchers.<Class>any(), anyList())).thenReturn(response); + + ((RestNotification)notifier).atlasClientV2 = client; + + try { + ((RestNotification)notifier).sendInternal(NotificationInterface.NotificationType.HOOK, new ArrayList<String>(Arrays.asList("Dummy"))); + } catch (NotificationException e) { + Assert.assertTrue(e.getMessage().contains(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode())); + } + } + +} diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 1cdfcef8a..936423aa2 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -22,7 +22,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.atlas.*; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.kafka.AtlasKafkaMessage; +import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; @@ -83,6 +85,7 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -91,6 +94,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -123,6 +127,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException"; private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000; + private static final String ATLAS_HOOK_CONSUMER_THREAD_NAME = "atlas-hook-consumer-thread"; + private static final String ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME = "atlas-hook-unsorted-consumer-thread"; + // from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer public static final String DUMMY_DATABASE = "_dummy_database"; public static final String DUMMY_TABLE = "_dummy_table"; @@ -195,6 +202,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private Instant nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now()); private final Map<TopicPartition, Long> lastCommittedPartitionOffset; private final EntityCorrelationManager entityCorrelationManager; + private final long consumerMsgBufferingIntervalMS; + private final int consumerMsgBufferingBatchSize; @VisibleForTesting final int consumerRetryInterval; @@ -230,6 +239,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean(); authorizeUsingMessageUser = applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false); + consumerMsgBufferingIntervalMS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_INTERVAL.getInt() * 1000; + consumerMsgBufferingBatchSize = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_BATCH_SIZE.getInt(); int authnCacheTtlSeconds = applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300); @@ -350,17 +361,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } private void startConsumers(ExecutorService executorService) { - int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); + int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); + Map<NotificationConsumer<HookNotification>, NotificationType> notificationConsumersByType = new HashMap<>(); + List<NotificationConsumer<HookNotification>> notificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK, numThreads); + for (NotificationConsumer<HookNotification> notificationConsumer : notificationConsumers) { + notificationConsumersByType.put(notificationConsumer, NotificationType.HOOK); + } + + if (AtlasHook.isHookMsgsSortEnabled) { + List<NotificationConsumer<HookNotification>> unsortedNotificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK_UNSORTED, numThreads); + for (NotificationConsumer<HookNotification> unsortedNotificationConsumer : unsortedNotificationConsumers) { + notificationConsumersByType.put(unsortedNotificationConsumer, NotificationType.HOOK_UNSORTED); + } + } if (executorService == null) { - executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build()); + executorService = Executors.newFixedThreadPool(notificationConsumersByType.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build()); } executors = executorService; - for (final NotificationConsumer<HookNotification> consumer : notificationConsumers) { - HookConsumer hookConsumer = new HookConsumer(consumer); + for (final NotificationConsumer<HookNotification> consumer : notificationConsumersByType.keySet()) { + String hookConsumerName = ATLAS_HOOK_CONSUMER_THREAD_NAME; + + if (notificationConsumersByType.get(consumer).equals(NotificationType.HOOK_UNSORTED)) { + hookConsumerName = ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME; + } + + HookConsumer hookConsumer = new HookConsumer(hookConsumerName, consumer); consumers.add(hookConsumer); executors.submit(hookConsumer); @@ -529,8 +558,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final List<String> failedMessages = new ArrayList<>(); private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); + private int duplicateKeyCounter = 1; + public HookConsumer(NotificationConsumer<HookNotification> consumer) { - super("atlas-hook-consumer-thread"); + super(ATLAS_HOOK_CONSUMER_THREAD_NAME); + + this.consumer = consumer; + } + + public HookConsumer(String consumerThreadName, NotificationConsumer<HookNotification> consumer) { + super(consumerThreadName); this.consumer = consumer; } @@ -548,10 +585,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { while (shouldRun.get()) { try { - List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset); - - for (AtlasKafkaMessage<HookNotification> msg : messages) { - handleMessage(msg); + if (StringUtils.equals(ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME, this.getName())) { + long msgBufferingStartTime = System.currentTimeMillis(); + Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer = new TreeMap<>(); + sortAndPublishMsgsToAtlasHook(msgBufferingStartTime, msgBuffer); + } else { + List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset); + for (AtlasKafkaMessage<HookNotification> msg : messages) { + handleMessage(msg); + } } } catch (IllegalStateException ex) { adaptiveWaiter.pause(ex); @@ -576,6 +618,63 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } + private void resetDuplicateKeyCounter() { + duplicateKeyCounter = 1; + } + + private String getKey(String msgCreated, String source) { + return String.format("%s_%s", msgCreated, source); + } + + private void sortMessages(AtlasKafkaMessage<HookNotification> msg, Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer) { + String key = getKey(Long.toString(msg.getMsgCreated()), msg.getSource()); + if (msgBuffer.containsKey(key)) { //Duplicate key can possible for messages from same source with same msgCreationTime + key = getKey(key, Integer.toString(duplicateKeyCounter)); + duplicateKeyCounter++; + } + msgBuffer.put(key, msg); + } + + void sortAndPublishMsgsToAtlasHook(long msgBufferingStartTime, Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer) throws NotificationException { + + List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveRawRecordsWithCheckedCommit(lastCommittedPartitionOffset); + AtlasKafkaMessage<HookNotification> maxOffsetMsg = null; + long maxOffsetProcessed = 0; + + messages.stream().forEach(x -> sortMessages(x, msgBuffer)); + + if (msgBuffer.size() < consumerMsgBufferingBatchSize && System.currentTimeMillis() - msgBufferingStartTime < consumerMsgBufferingIntervalMS) { + sortAndPublishMsgsToAtlasHook(msgBufferingStartTime, msgBuffer); + return; + } + + for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values()) { + String hookTopic = StringUtils.isNotEmpty(msg.getTopic()) ? msg.getTopic().split(KafkaNotification.UNSORTED_POSTFIX)[0] : KafkaNotification.ATLAS_HOOK_TOPIC; + if (maxOffsetProcessed == 0 || maxOffsetProcessed < msg.getOffset()) { + maxOffsetMsg = msg; + maxOffsetProcessed = msg.getOffset(); + } + + ((KafkaNotification)notificationInterface).sendInternal(hookTopic, + StringUtils.isNotEmpty(msg.getRawRecordData()) ? Arrays.asList(msg.getRawRecordData()) : Arrays.asList(msg.getMessage().toString())); + } + + + /** In case of failure while publishing sorted messages(above for loop), consuming of unsorted messages should start from the initial offset + * Explicitly keeping this for loop separate to commit messages only after sending all batch messages to hook topic + */ + for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values()) { + commit(msg); + } + + if (maxOffsetMsg != null) { + commit(maxOffsetMsg); + } + + msgBuffer.clear(); + resetDuplicateKeyCounter(); + } + @VisibleForTesting void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) { AtlasPerfTracer perf = null; diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java new file mode 100644 index 000000000..295579e46 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/rest/NotificationREST.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.web.rest; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.authorize.AtlasAdminAccessRequest; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.authorize.AtlasPrivilege; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.kafka.KafkaNotification; +import org.apache.atlas.notification.NotificationException; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.utils.AtlasJson; +import org.apache.atlas.web.util.Servlets; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.*; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.util.*; + +@Path("v2/notification") +@Singleton +@Service +@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) +public class NotificationREST { + private static final Logger LOG = LoggerFactory.getLogger(NotificationREST.class); + public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); + public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString(); + private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC); + private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC); + private static final Set<String> TOPICS = new HashSet<>(); + + private final NotificationInterface notificationInterface; + + static { + TOPICS.addAll(Arrays.asList(ATLAS_HOOK_CONSUMER_TOPICS)); + TOPICS.addAll(Arrays.asList(ATLAS_ENTITIES_CONSUMER_TOPICS)); + } + + @Inject + public NotificationREST(NotificationInterface notificationInterface) { + this.notificationInterface = notificationInterface; + } + + /** + * Publish notifications on Kafka topic + * + * @param topicName - nameOfTheQueue + * @throws AtlasBaseException + */ + @POST + @Path("/topic/{topicName}") + @Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) + public void handleNotifications(@PathParam("topicName") String topicName, @Context HttpServletRequest request) throws AtlasBaseException, IOException { + LOG.debug("Handling notifications for topic {}", topicName); + AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.SERVICE_NOTIFICATION_POST), "post on rest notification service"); + + if (!TOPICS.contains(topicName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_TOPIC_NAME, topicName); + } + + String messagesAsJson = Servlets.getRequestPayload(request); + List<String> messages = getMessagesToNotify(messagesAsJson); + + try { + KafkaNotification notifier = (KafkaNotification) notificationInterface; + notifier.sendInternal(topicName, messages, AtlasHook.isHookMsgsSortEnabled); + + } catch (NotificationException exception) { + List<String> failedMessages = exception.getFailedMessages(); + String concatenatedMessage = StringUtils.join(failedMessages, "\n"); + + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_EXCEPTION, exception, concatenatedMessage); + } + + } + + private List<String> getMessagesToNotify(String messagesAsJson) { + List<String> messages = new ArrayList<>(); + + try { + ArrayNode messageNodes = AtlasJson.parseToV1ArrayNode(messagesAsJson); + for (JsonNode messageNode : messageNodes) { + messages.add(AtlasJson.toV1Json(messageNode)); + } + } catch (IOException e) { + messages.add(messagesAsJson); + } + + return messages; + } + +} \ No newline at end of file diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java new file mode 100644 index 000000000..2c907598d --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/integration/NotificationRestIT.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.web.integration; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.utils.TestResourceFileUtils; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; + +import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +public class NotificationRestIT extends BaseResourceIT { + + @Test + public void unAuthPostNotification() throws IOException { + AtlasClientV2 unAuthClient = new AtlasClientV2(atlasUrls, new String[]{"admin", "wr0ng_pa55w0rd"}); + + try { + unAuthClient.postNotificationToTopic(ATLAS_HOOK_TOPIC, new ArrayList<String>(Arrays.asList("Dummy"))); + } catch(AtlasServiceException e) { + assertNotNull(e.getStatus(), "expected server error code in the status"); + } + } + + @Test + public void postNotificationBasicTest() throws Exception { + String db_name = "db_" + randomString(); + String cluster_name = "cl" + randomString(); + String qualifiedName = db_name + "@" + cluster_name; + String notificationString = TestResourceFileUtils.getJson("notifications/create-db") + .replaceAll("--name--", db_name).replaceAll("--clName--", cluster_name) + .replace("\"--ts--\"", String.valueOf((new Date()).getTime())); + + try { + atlasClientV2.postNotificationToTopic(ATLAS_HOOK_TOPIC, new ArrayList<String>(Arrays.asList(notificationString))); + + waitFor(MAX_WAIT_TIME, new Predicate() { + @Override + public boolean evaluate() throws Exception { + ArrayNode results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, qualifiedName)); + + return results.size() == 1; + } + }); + } catch(AtlasServiceException e) { + assertNull(e.getStatus(), "expected no server error code in the status"); + } + + } +} diff --git a/webapp/src/test/resources/json/notifications/create-db-ddl.json b/webapp/src/test/resources/json/notifications/create-db-ddl.json new file mode 100644 index 000000000..443495dde --- /dev/null +++ b/webapp/src/test/resources/json/notifications/create-db-ddl.json @@ -0,0 +1 @@ +{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.100.78","msgCreatedBy":"hive","msgCreationTime":"--ts--","spooled":false,"message":{"type":"ENTITY_CREATE_V2","user":"root","entities":{"entities":[{"typeName":"hive_db_ddl","attributes":{"serviceType":"hive","qualifiedName":"--name--@--clName--:--execTime--","execTime":"--execTime--","queryText":"create database --name--","name":"create database --name--" [...] \ No newline at end of file diff --git a/webapp/src/test/resources/json/notifications/create-db.json b/webapp/src/test/resources/json/notifications/create-db.json new file mode 100644 index 000000000..8df0e4dcc --- /dev/null +++ b/webapp/src/test/resources/json/notifications/create-db.json @@ -0,0 +1 @@ +{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.10.4","msgCreatedBy":"hive","msgCreationTime":"--ts--","message":{"type":"ENTITY_CREATE_V2","user":"hive","entities":{"referredEntities":{},"entities":[{"typeName":"hive_db","attributes":{"owner":"admin","ownerType":"USER","managedLocation":null,"qualifiedName":"--name--@--clName--","clusterName":"--clName--","name":"--name--","location":"some_location","p [...] \ No newline at end of file diff --git a/webapp/src/test/resources/json/notifications/delete-db.json b/webapp/src/test/resources/json/notifications/delete-db.json new file mode 100644 index 000000000..26e82e917 --- /dev/null +++ b/webapp/src/test/resources/json/notifications/delete-db.json @@ -0,0 +1 @@ +{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.72.140","msgCreatedBy":"hive","msgCreationTime":"--ts--","spooled":false,"message":{"type":"ENTITY_DELETE_V2","user":"hive","entities":[{"typeName":"hive_db","uniqueAttributes":{"qualifiedName":"--name--@--clName--"}}]}} \ No newline at end of file