This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 978087a88 ATLAS-4878: utility to analyze hook notifications 978087a88 is described below commit 978087a882348f1fc1b6002a0aeb29192d8cc00a Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Sun Jun 9 18:36:56 2024 -0700 ATLAS-4878: utility to analyze hook notifications --- distro/pom.xml | 1 + .../src/main/assemblies/notification-analyzer.xml | 71 +++++ pom.xml | 1 + tools/notification-analyzer/README | 119 ++++++++ tools/notification-analyzer/pom.xml | 192 ++++++++++++ .../scripts/notification-analyzer.sh | 28 ++ .../apache/atlas/tools/NotificationAnalyzer.java | 322 +++++++++++++++++++++ .../main/resources/atlas-application.properties | 18 ++ .../src/main/resources/atlas-log4j.xml | 44 +++ 9 files changed, 796 insertions(+) diff --git a/distro/pom.xml b/distro/pom.xml index 874b944f2..66f710896 100644 --- a/distro/pom.xml +++ b/distro/pom.xml @@ -136,6 +136,7 @@ atlas.graph.storage.hbase.regions-per-server=1 <descriptor>src/main/assemblies/atlas-repair-index-package.xml</descriptor> <!--<descriptor>src/main/assemblies/migration-exporter.xml</descriptor>--> <descriptor>src/main/assemblies/classification-updater.xml</descriptor> + <descriptor>src/main/assemblies/notification-analyzer.xml</descriptor> </descriptors> <finalName>apache-atlas-${project.version}</finalName> <tarLongFileMode>gnu</tarLongFileMode> diff --git a/distro/src/main/assemblies/notification-analyzer.xml b/distro/src/main/assemblies/notification-analyzer.xml new file mode 100644 index 000000000..63d9f2490 --- /dev/null +++ b/distro/src/main/assemblies/notification-analyzer.xml @@ -0,0 +1,71 @@ +<!-- +** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* +--> +<assembly> + <id>notification-analyzer</id> + <formats> + <format>zip</format> + </formats> + + <baseDirectory>notification-analyzer</baseDirectory> + + <fileSets> + <fileSet> + <includes> + <include>README*</include> + </includes> + </fileSet> + <fileSet> + <directory>../tools/notification-analyzer/target/dependency</directory> + <outputDirectory>.</outputDirectory> + </fileSet> + <fileSet> + <directory>../tools/notification-analyzer/scripts</directory> + <outputDirectory>.</outputDirectory> + <includes> + <include>*.sh</include> + </includes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + <fileSet> + <directory>../tools/notification-analyzer/src/main/resources</directory> + <outputDirectory>.</outputDirectory> + <includes> + <include>atlas-log4j.xml</include> + <include>atlas-application.properties</include> + </includes> + </fileSet> + <fileSet> + <directory>../tools/notification-analyzer</directory> + <outputDirectory>.</outputDirectory> + <includes> + <include>README</include> + </includes> + </fileSet> + <fileSet> + <directory>../tools/notification-analyzer/target</directory> + <outputDirectory>.</outputDirectory> + <includes> + <include>atlas-notification-analyzer-${project.version}.jar</include> + </includes> + </fileSet> + </fileSets> +</assembly> diff --git a/pom.xml b/pom.xml index 2f9aedbd2..bfb1094ff 100644 --- a/pom.xml +++ b/pom.xml @@ -846,6 +846,7 @@ <module>addons/kafka-bridge</module> <module>tools/classification-updater</module> <module>tools/atlas-index-repair</module> + <module>tools/notification-analyzer</module> <module>addons/impala-hook-api</module> <module>addons/impala-bridge-shim</module> <module>addons/impala-bridge</module> diff --git a/tools/notification-analyzer/README b/tools/notification-analyzer/README new file mode 100644 index 000000000..7631e06d9 --- /dev/null +++ b/tools/notification-analyzer/README @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +Introduction + This utility analyzes hook notification messages stored in JSON format + in a file, and reports following details: + - number of notifications per notification type + - number of entities created/updated + - number of entity references in notifications per entity type + - number of entity operations performed + - number of entity operations performed per entity type + +Setup + - All libraries necessary to run the utility are packaged in following file: + distro/target/apache-atlas-<version>-notification-analyzer.zip + + - Unzip the file in the directory where the tool needs to be installed. + + - Update log configurations in atlas-log4j.xml + + +Running the utility + - Execute following command to start the utility: + ./notification-analyzer.sh -m message_file.json [-o output_file] + + - Progress will be printed in the following format in the output file (if specified) or on stdout: + PROGRESS #1: analyzed 1000 notifications, 1071 messages + PROGRESS #2: analyzed 2000 notifications, 2131 messages + PROGRESS #3: analyzed 3000 notifications, 3194 messages + ... + Completed analyzing 114755 notification, 120816 messages. Time taken: 234 seconds + + - Note that the number of notifications might be less than the number + of messages in the file in case some notifications were split into + multiple messages due to their size. + + - Logs will be printed in file /tmp/atlas-notification-analyzer.log. The location of + the log file can be configured using following environment variables: + LOGFILE_DIR + LOGFILE_NAME + +Sample Result: + The utility will print the result of analysis in the following format: + { + "notifications": 114755, + "notificationLengthAvg": 74331, + "notificationLengthMax": 101148684, + "splitNotifications": 453, + "splitNotificationLengthAvg": 13901446, + "splitNotificationLengthMax": 101148684, + "entities": 598435, + "notificationEntities": 2575347, + "notificationByType": { + "ENTITY_CREATE_V2": 49428, + "ENTITY_FULL_UPDATE_V2": 1597, + "ENTITY_PARTIAL_UPDATE_V2": 36561, + "ENTITY_DELETE_V2": 27169 + }, + "notificationEntityByType": { + "hdfs_path": 16417, + "hive_db": 20471, + "hive_table": 57143, + "hive_storagedesc": 30018, + "hive_column": 685384, + "hive_process": 41512 + "hive_column_lineage": 1724402, + }, + "entityOperations": { + "CREATE": 598435, + "UPDATE": 1913182 + "PARTIAL_UPDATE": 36561, + "DELETE": 27169 + }, + "entityOperationsByType": { + "CREATE": { + "hdfs_path": 10940, + "hive_db": 224, + "hive_table": 22154, + "hive_storagedesc": 15280, + "hive_column": 332332, + "hive_process": 23462, + "hive_column_lineage": 194043 + }, + "UPDATE" { + "hdfs_path": 5477, + "hive_column": 319559, + "hive_column_lineage": 1530359, + "hive_db": 20203, + "hive_process": 18050, + "hive_storagedesc": 13204, + "hive_table": 6330 + }, + "PARTIAL_UPDATE": { + "hive_column": 33493, + "hive_storagedesc": 1534, + "hive_table": 1534 + }, + "DELETE": { + "hive_db": 44, + "hive_table": 27125 + } + } + } + diff --git a/tools/notification-analyzer/pom.xml b/tools/notification-analyzer/pom.xml new file mode 100644 index 000000000..2f772a76f --- /dev/null +++ b/tools/notification-analyzer/pom.xml @@ -0,0 +1,192 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>apache-atlas</artifactId> + <groupId>org.apache.atlas</groupId> + <version>3.0.0-SNAPSHOT</version> + <relativePath>../../</relativePath> + </parent> + <artifactId>atlas-notification-analyzer</artifactId> + <description>Apache Atlas Notification Analyzer</description> + <name>Apache Atlas Notification Analyzer</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-notification</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>${commons-cli.version}</version> + </dependency> + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + <version>${commons-collections.version}</version> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>dist</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-binaries</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/dependency</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-intg</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-notification</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>${commons-cli.version}</version> + </artifactItem> + <artifactItem> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>${commons-codec.version}</version> + </artifactItem> + <artifactItem> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + <version>${commons-collections.version}</version> + </artifactItem> + <artifactItem> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${commons-io.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </artifactItem> + <artifactItem> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.26.2</version> + </artifactItem> + <artifactItem> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <version>${commons-conf.version}</version> + </artifactItem> + <artifactItem> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>${commons-lang.version}</version> + </artifactItem> + <artifactItem> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + <version>${commons-logging.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </artifactItem> + <artifactItem> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.databind.version}</version> + </artifactItem> + <artifactItem> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>${log4j.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${log4j2.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j2.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </artifactItem> + <artifactItem> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/tools/notification-analyzer/scripts/notification-analyzer.sh b/tools/notification-analyzer/scripts/notification-analyzer.sh new file mode 100644 index 000000000..179336bd0 --- /dev/null +++ b/tools/notification-analyzer/scripts/notification-analyzer.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. See accompanying LICENSE file. +# +# resolve links - $0 may be a softlink + +M2_REPO=~/.m2/repository + +LIB_DIR=$(pwd) +LOGFILE_DIR="${LOGFILE_DIR:-/tmp/}" +LOGFILE_NAME="${LOGFILE_NAME:-atlas-notification-analyzer.log}" + +CP=. +for i in "${LIB_DIR}/"*.jar; do + CP="${CP}:$i" +done + +java -cp ${CP} -Dlog4j.configuration=atlas-log4j.xml -Datlas.log.dir=${LOGFILE_DIR} -Datlas.log.file=${LOGFILE_NAME} org.apache.atlas.tools.NotificationAnalyzer $* diff --git a/tools/notification-analyzer/src/main/java/org/apache/atlas/tools/NotificationAnalyzer.java b/tools/notification-analyzer/src/main/java/org/apache/atlas/tools/NotificationAnalyzer.java new file mode 100644 index 000000000..20b106546 --- /dev/null +++ b/tools/notification-analyzer/src/main/java/org/apache/atlas/tools/NotificationAnalyzer.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.tools; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.notification.AtlasNotificationMessageDeserializer; +import org.apache.atlas.notification.NotificationInterface.NotificationType; +import org.apache.atlas.utils.AtlasJson; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest; +import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; +import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2; +import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2; +import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class NotificationAnalyzer { + private static final Logger LOG = LoggerFactory.getLogger(NotificationAnalyzer.class); + + private final String msgFile; + private final String outputFile; + private final AtlasNotificationMessageDeserializer deserializer; + private final Map<String, AtomicInteger> notificationCountByType = new HashMap<>(); + private final AtomicInteger entityCount = new AtomicInteger(); + private final Map<String, AtomicInteger> entityCountByType = new HashMap<>(); + private final Map<String, AtomicInteger> entityOperCount = new HashMap<>(); + private final Map<String, Map<String, AtomicInteger>> entityOperByTypeCount = new HashMap<>(); + private final Set<String> knownEntities = new HashSet<>(); + private final IntSummaryStatistics notificationStats = new IntSummaryStatistics(); + private final IntSummaryStatistics splitNotificationStats = new IntSummaryStatistics(); + + public static void main(String[] args) { + CommandLineParser parser = new BasicParser(); + Options options = new Options(); + + options.addOption("m", "message-file", true, "Messages file"); + options.addOption("o", "output-file", true, "Output file"); + + try { + CommandLine cmd = parser.parse(options, args); + String msgFile = cmd.getOptionValue("m"); + String outFile = cmd.getOptionValue("o"); + + if (msgFile == null || msgFile.isEmpty()) { + msgFile = "ATLAS_HOOK.json"; + } + + NotificationAnalyzer analyzer = new NotificationAnalyzer(msgFile, outFile, NotificationType.HOOK); + + analyzer.analyze(); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + public NotificationAnalyzer(String msgFile, String outputFile, NotificationType notificationType) { + this.msgFile = msgFile; + this.outputFile = outputFile; + this.deserializer = notificationType.getDeserializer(); + } + + public void analyze() throws Exception { + long startTimeMs = System.currentTimeMillis(); + + try (BufferedReader reader = getInputReader(); PrintWriter writer = getOutputWriter()) { + int msgCount = 0; + int notificationSize = 0; + + for (String msg = reader.readLine(); msg != null; msg = reader.readLine()) { + msgCount++; + notificationSize += msg.length(); + + HookNotification notification = (HookNotification) deserializer.deserialize(msg); + + if (notification == null) { // split notification, continue + continue; + } + + notificationStats.accept(notificationSize); + + if (notificationSize > msg.length()) { + splitNotificationStats.accept(notificationSize); + } + + notificationSize = 0; + + notificationCountByType.computeIfAbsent(notification.getType().name(), e -> new AtomicInteger()).incrementAndGet(); + + switch (notification.getType()) { + case ENTITY_CREATE: + handleEntityCreate((EntityCreateRequest) notification); + break; + case ENTITY_PARTIAL_UPDATE: + handleEntityPartialUpdate((EntityPartialUpdateRequest) notification); + break; + case ENTITY_FULL_UPDATE: + handleEntityUpdate((EntityUpdateRequest) notification); + break; + case ENTITY_DELETE: + handleEntityDelete((EntityDeleteRequest) notification); + break; + case ENTITY_CREATE_V2: + handleEntityCreateV2((EntityCreateRequestV2) notification); + break; + case ENTITY_PARTIAL_UPDATE_V2: + handleEntityPartialUpdateV2((EntityPartialUpdateRequestV2) notification); + break; + case ENTITY_FULL_UPDATE_V2: + handleEntityUpdateV2((EntityUpdateRequestV2) notification); + break; + case ENTITY_DELETE_V2: + handleEntityDeleteV2((EntityDeleteRequestV2) notification); + break; + } + + if ((notificationStats.getCount() % 1000) == 0) { + LOG.info("PROGRESS #{}: analyzed {} notifications, {} messages", (msgCount / 1000), notificationStats.getCount(), msgCount); + writer.printf("PROGRESS #%1$d: analyzed %2$d notifications, %3$d messages%n", (msgCount / 1000), notificationStats.getCount(), msgCount); + writer.flush(); + } + } + + long timeTakenSeconds = (System.currentTimeMillis() - startTimeMs) / 1000; + + LOG.info("Completed analyzing {}. Time taken: {} seconds", msgFile, timeTakenSeconds); + writer.printf("Completed analyzing %1$s. Time taken: %2$d seconds%n", msgFile, timeTakenSeconds); + writer.flush(); + + Map<String, Object> results = new LinkedHashMap<>(); + + results.put("messages", msgCount); + results.put("notifications", notificationStats.getCount()); + results.put("notificationLengthAvg", (int) notificationStats.getAverage()); + results.put("notificationLengthMax", notificationStats.getMax()); + results.put("splitNotifications", splitNotificationStats.getCount()); + results.put("splitNotificationLengthAvg", (int) splitNotificationStats.getAverage()); + results.put("splitNotificationLengthMax", splitNotificationStats.getMax()); + results.put("entities", entityCount); + results.put("notificationEntities", entityCountByType.values().stream().mapToInt(AtomicInteger::get).sum()); + results.put("notificationByType", notificationCountByType); + results.put("notificationEntityByType", entityCountByType); + results.put("entityOperations", entityOperCount); + results.put("entityOperationsByType", entityOperByTypeCount); + + String msg = AtlasJson.toJson(results); + + LOG.info(msg); + writer.println(msg); + writer.flush(); + } + } + + private void handleEntityCreate(EntityCreateRequest request) { + if (request.getEntities() != null) { + for (Referenceable entity : request.getEntities()) { + recordEntity(entity); + } + } + } + + private void handleEntityUpdate(EntityUpdateRequest request) { + if (request.getEntities() != null) { + for (Referenceable entity : request.getEntities()) { + recordEntity(entity); + } + } + } + + private void handleEntityPartialUpdate(EntityPartialUpdateRequest request) { + recordEntityOperation(request.getTypeName(), "PARTIAL_UPDATE"); + } + + private void handleEntityDelete(EntityDeleteRequest request) { + knownEntities.remove(getEntityKey(request)); + + recordEntityOperation(request.getTypeName(), "DELETE"); + } + + private void handleEntityCreateV2(EntityCreateRequestV2 request) { + if (request.getEntities() != null) { + if (request.getEntities().getEntities() != null) { + for (AtlasEntity entity : request.getEntities().getEntities()) { + recordEntity(entity); + } + } + + if (request.getEntities().getReferredEntities() != null) { + for (AtlasEntity entity : request.getEntities().getReferredEntities().values()) { + recordEntity(entity); + } + } + } + } + + private void handleEntityUpdateV2(EntityUpdateRequestV2 request) { + if (request.getEntities() != null) { + if (request.getEntities().getEntities() != null) { + for (AtlasEntity entity : request.getEntities().getEntities()) { + recordEntity(entity); + } + } + + if (request.getEntities().getReferredEntities() != null) { + for (AtlasEntity entity : request.getEntities().getReferredEntities().values()) { + recordEntity(entity); + } + } + } + } + + private void handleEntityPartialUpdateV2(EntityPartialUpdateRequestV2 request) { + if (request.getEntity() != null && request.getEntity().getEntity() != null) { + AtlasEntity entity = request.getEntity().getEntity(); + + recordEntityOperation(entity.getTypeName(), "PARTIAL_UPDATE"); + } + } + + private void handleEntityDeleteV2(EntityDeleteRequestV2 request) { + if (request.getEntities() != null) { + for (AtlasObjectId objId : request.getEntities()) { + knownEntities.remove(getEntityKey(objId)); + + recordEntityOperation(objId.getTypeName(), "DELETE"); + } + } + } + + private void recordEntity(AtlasEntity entity) { + final String operation; + + if (knownEntities.add(getEntityKey(entity))) { + operation = "CREATE"; + } else { + operation = "UPDATE"; + } + + recordEntityOperation(entity.getTypeName(), operation); + } + + private void recordEntity(Referenceable entity) { + final String operation; + + if (knownEntities.add(getEntityKey(entity))) { + operation = "CREATE"; + } else { + operation = "UPDATE"; + } + + recordEntityOperation(entity.getTypeName(), operation); + } + + private void recordEntityOperation(String entityTypeName, String operation) { + if (operation.equals("CREATE")) { + entityCount.incrementAndGet(); + } + + entityCountByType.computeIfAbsent(entityTypeName, c -> new AtomicInteger()).incrementAndGet(); + entityOperCount.computeIfAbsent(operation, c -> new AtomicInteger()).incrementAndGet(); + entityOperByTypeCount.computeIfAbsent(operation, c -> new TreeMap<>()).computeIfAbsent(entityTypeName, c -> new AtomicInteger()).incrementAndGet(); + } + + private String getEntityKey(AtlasEntity entity) { + return entity.getTypeName() + ":" + getUniqueKey(entity.getAttributes()); + } + + private String getEntityKey(AtlasObjectId objectId) { + return objectId.getTypeName() + ":" + getUniqueKey(objectId.getUniqueAttributes()); + } + + private String getEntityKey(Referenceable entity) { + return entity.getTypeName() + ":" + getUniqueKey(entity.getValues()); + } + + private String getEntityKey(EntityDeleteRequest request) { + return request.getTypeName() + ":" + request.getAttributeValue(); + } + + private Object getUniqueKey(Map<String, Object> attributes) { + Object ret = attributes.get("qualifiedName"); + + return ret == null ? attributes.get("name") : ret; + } + + private BufferedReader getInputReader() throws IOException { + return new BufferedReader(new FileReader(msgFile)); + } + + private PrintWriter getOutputWriter() throws IOException { + return (outputFile == null || outputFile.isEmpty()) ? new PrintWriter(System.out) : new PrintWriter(new FileWriter(outputFile, true)); + } +} diff --git a/tools/notification-analyzer/src/main/resources/atlas-application.properties b/tools/notification-analyzer/src/main/resources/atlas-application.properties new file mode 100644 index 000000000..01d705720 --- /dev/null +++ b/tools/notification-analyzer/src/main/resources/atlas-application.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + diff --git a/tools/notification-analyzer/src/main/resources/atlas-log4j.xml b/tools/notification-analyzer/src/main/resources/atlas-log4j.xml new file mode 100755 index 000000000..0f9182f36 --- /dev/null +++ b/tools/notification-analyzer/src/main/resources/atlas-log4j.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/> + </layout> + </appender> + + <appender name="FILE" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/> + <param name="Append" value="true"/> + <param name="maxFileSize" value="100MB" /> + <param name="maxBackupIndex" value="20" /> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/> + </layout> + </appender> + + <root> + <priority value="info"/> + <appender-ref ref="FILE"/> + </root> +</log4j:configuration>