This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 978087a88 ATLAS-4878: utility to analyze hook notifications
978087a88 is described below

commit 978087a882348f1fc1b6002a0aeb29192d8cc00a
Author: Madhan Neethiraj <mad...@apache.org>
AuthorDate: Sun Jun 9 18:36:56 2024 -0700

    ATLAS-4878: utility to analyze hook notifications
---
 distro/pom.xml                                     |   1 +
 .../src/main/assemblies/notification-analyzer.xml  |  71 +++++
 pom.xml                                            |   1 +
 tools/notification-analyzer/README                 | 119 ++++++++
 tools/notification-analyzer/pom.xml                | 192 ++++++++++++
 .../scripts/notification-analyzer.sh               |  28 ++
 .../apache/atlas/tools/NotificationAnalyzer.java   | 322 +++++++++++++++++++++
 .../main/resources/atlas-application.properties    |  18 ++
 .../src/main/resources/atlas-log4j.xml             |  44 +++
 9 files changed, 796 insertions(+)

diff --git a/distro/pom.xml b/distro/pom.xml
index 874b944f2..66f710896 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -136,6 +136,7 @@ atlas.graph.storage.hbase.regions-per-server=1
                                         
<descriptor>src/main/assemblies/atlas-repair-index-package.xml</descriptor>
                                         
<!--<descriptor>src/main/assemblies/migration-exporter.xml</descriptor>-->
                                         
<descriptor>src/main/assemblies/classification-updater.xml</descriptor>
+                                        
<descriptor>src/main/assemblies/notification-analyzer.xml</descriptor>
                                     </descriptors>
                                     
<finalName>apache-atlas-${project.version}</finalName>
                                     <tarLongFileMode>gnu</tarLongFileMode>
diff --git a/distro/src/main/assemblies/notification-analyzer.xml 
b/distro/src/main/assemblies/notification-analyzer.xml
new file mode 100644
index 000000000..63d9f2490
--- /dev/null
+++ b/distro/src/main/assemblies/notification-analyzer.xml
@@ -0,0 +1,71 @@
+<!--
+**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*
+-->
+<assembly>
+    <id>notification-analyzer</id>
+    <formats>
+        <format>zip</format>
+    </formats>
+
+    <baseDirectory>notification-analyzer</baseDirectory>
+
+    <fileSets>
+        <fileSet>
+            <includes>
+                <include>README*</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            
<directory>../tools/notification-analyzer/target/dependency</directory>
+            <outputDirectory>.</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>../tools/notification-analyzer/scripts</directory>
+            <outputDirectory>.</outputDirectory>
+            <includes>
+                <include>*.sh</include>
+            </includes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+        <fileSet>
+            
<directory>../tools/notification-analyzer/src/main/resources</directory>
+            <outputDirectory>.</outputDirectory>
+            <includes>
+                <include>atlas-log4j.xml</include>
+                <include>atlas-application.properties</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>../tools/notification-analyzer</directory>
+            <outputDirectory>.</outputDirectory>
+            <includes>
+                <include>README</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>../tools/notification-analyzer/target</directory>
+            <outputDirectory>.</outputDirectory>
+            <includes>
+                
<include>atlas-notification-analyzer-${project.version}.jar</include>
+            </includes>
+        </fileSet>
+    </fileSets>
+</assembly>
diff --git a/pom.xml b/pom.xml
index 2f9aedbd2..bfb1094ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -846,6 +846,7 @@
         <module>addons/kafka-bridge</module>
         <module>tools/classification-updater</module>
         <module>tools/atlas-index-repair</module>
+        <module>tools/notification-analyzer</module>
         <module>addons/impala-hook-api</module>
         <module>addons/impala-bridge-shim</module>
         <module>addons/impala-bridge</module>
diff --git a/tools/notification-analyzer/README 
b/tools/notification-analyzer/README
new file mode 100644
index 000000000..7631e06d9
--- /dev/null
+++ b/tools/notification-analyzer/README
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+Introduction
+  This utility analyzes hook notification messages stored in JSON format
+  in a file, and reports following details:
+    - number of notifications per notification type
+    - number of entities created/updated
+    - number of entity references in notifications per entity type
+    - number of entity operations performed
+    - number of entity operations performed per entity type
+
+Setup
+  - All libraries necessary to run the utility are packaged in following file:
+      distro/target/apache-atlas-<version>-notification-analyzer.zip
+
+  - Unzip the file in the directory where the tool needs to be installed.
+
+  - Update log configurations in atlas-log4j.xml
+
+
+Running the utility
+  - Execute following command to start the utility:
+     ./notification-analyzer.sh -m message_file.json [-o output_file]
+
+  - Progress will be printed in the following format in the output file (if 
specified) or on stdout:
+     PROGRESS #1: analyzed 1000 notifications, 1071 messages
+     PROGRESS #2: analyzed 2000 notifications, 2131 messages
+     PROGRESS #3: analyzed 3000 notifications, 3194 messages
+     ...
+     Completed analyzing 114755 notification, 120816 messages. Time taken: 234 
seconds
+
+  - Note that the number of notifications might be less than the number
+    of messages in the file in case some notifications were split into
+    multiple messages due to their size.
+
+  - Logs will be printed in file /tmp/atlas-notification-analyzer.log. The 
location of
+    the log file can be configured using following environment variables:
+      LOGFILE_DIR
+      LOGFILE_NAME
+
+Sample Result:
+  The utility will print the result of analysis in the following format:
+    {
+      "notifications":                 114755,
+      "notificationLengthAvg":          74331,
+      "notificationLengthMax":      101148684,
+      "splitNotifications":               453,
+      "splitNotificationLengthAvg":  13901446,
+      "splitNotificationLengthMax": 101148684,
+      "entities":                      598435,
+      "notificationEntities":         2575347,
+      "notificationByType": {
+       "ENTITY_CREATE_V2":         49428,
+       "ENTITY_FULL_UPDATE_V2":     1597,
+       "ENTITY_PARTIAL_UPDATE_V2": 36561,
+       "ENTITY_DELETE_V2":         27169
+      },
+      "notificationEntityByType": {
+       "hdfs_path":             16417,
+       "hive_db":               20471,
+       "hive_table":            57143,
+       "hive_storagedesc":      30018,
+       "hive_column":          685384,
+       "hive_process":          41512
+       "hive_column_lineage": 1724402,
+      },
+      "entityOperations": {
+       "CREATE":         598435,
+       "UPDATE":        1913182
+       "PARTIAL_UPDATE":  36561,
+       "DELETE":          27169
+      },
+      "entityOperationsByType": {
+       "CREATE": {
+         "hdfs_path":            10940,
+         "hive_db":                224,
+         "hive_table":           22154,
+         "hive_storagedesc":     15280,
+         "hive_column":         332332,
+         "hive_process":         23462,
+         "hive_column_lineage": 194043
+       },
+       "UPDATE" {
+         "hdfs_path":              5477,
+         "hive_column":          319559,
+         "hive_column_lineage": 1530359,
+         "hive_db":               20203,
+         "hive_process":          18050,
+         "hive_storagedesc":      13204,
+         "hive_table":             6330
+       },
+       "PARTIAL_UPDATE": {
+        "hive_column":      33493,
+        "hive_storagedesc":  1534,
+        "hive_table":        1534
+       },
+       "DELETE": {
+         "hive_db":       44,
+         "hive_table": 27125
+       }
+      }
+    }
+
diff --git a/tools/notification-analyzer/pom.xml 
b/tools/notification-analyzer/pom.xml
new file mode 100644
index 000000000..2f772a76f
--- /dev/null
+++ b/tools/notification-analyzer/pom.xml
@@ -0,0 +1,192 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>apache-atlas</artifactId>
+        <groupId>org.apache.atlas</groupId>
+        <version>3.0.0-SNAPSHOT</version>
+        <relativePath>../../</relativePath>
+    </parent>
+    <artifactId>atlas-notification-analyzer</artifactId>
+    <description>Apache Atlas Notification Analyzer</description>
+    <name>Apache Atlas Notification Analyzer</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+           <version>${slf4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+           <version>${slf4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-notification</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>${commons-cli.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <version>${commons-collections.version}</version>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>dist</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-dependency-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>copy-binaries</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>copy</goal>
+                                </goals>
+                                <configuration>
+                                    
<outputDirectory>${project.build.directory}/dependency</outputDirectory>
+                                    
<overWriteReleases>false</overWriteReleases>
+                                    
<overWriteSnapshots>false</overWriteSnapshots>
+                                    <overWriteIfNewer>true</overWriteIfNewer>
+                                    <artifactItems>
+                                        <artifactItem>
+                                            
<groupId>${project.groupId}</groupId>
+                                            
<artifactId>${project.artifactId}</artifactId>
+                                            
<version>${project.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>${project.groupId}</groupId>
+                                            <artifactId>atlas-intg</artifactId>
+                                            
<version>${project.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>${project.groupId}</groupId>
+                                            
<artifactId>atlas-notification</artifactId>
+                                            
<version>${project.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>commons-cli</groupId>
+                                            
<artifactId>commons-cli</artifactId>
+                                            
<version>${commons-cli.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>commons-codec</groupId>
+                                            
<artifactId>commons-codec</artifactId>
+                                            
<version>${commons-codec.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>commons-collections</groupId>
+                                            
<artifactId>commons-collections</artifactId>
+                                            
<version>${commons-collections.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>commons-io</groupId>
+                                            <artifactId>commons-io</artifactId>
+                                            
<version>${commons-io.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>org.apache.commons</groupId>
+                                            
<artifactId>commons-lang3</artifactId>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>org.apache.commons</groupId>
+                                            
<artifactId>commons-compress</artifactId>
+                                            <version>1.26.2</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>commons-configuration</groupId>
+                                            
<artifactId>commons-configuration</artifactId>
+                                            
<version>${commons-conf.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>commons-lang</groupId>
+                                            
<artifactId>commons-lang</artifactId>
+                                            
<version>${commons-lang.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>commons-logging</groupId>
+                                            
<artifactId>commons-logging</artifactId>
+                                            
<version>${commons-logging.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>com.fasterxml.jackson.core</groupId>
+                                            
<artifactId>jackson-annotations</artifactId>
+                                            
<version>${jackson.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>com.fasterxml.jackson.core</groupId>
+                                            
<artifactId>jackson-core</artifactId>
+                                            
<version>${jackson.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>com.fasterxml.jackson.core</groupId>
+                                            
<artifactId>jackson-databind</artifactId>
+                                            
<version>${jackson.databind.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>log4j</groupId>
+                                            <artifactId>log4j</artifactId>
+                                            <version>${log4j.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>org.apache.logging.log4j</groupId>
+                                            <artifactId>log4j-core</artifactId>
+                                            
<version>${log4j2.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>org.apache.logging.log4j</groupId>
+                                            <artifactId>log4j-api</artifactId>
+                                            
<version>${log4j2.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>org.slf4j</groupId>
+                                            <artifactId>slf4j-api</artifactId>
+                                            <version>${slf4j.version}</version>
+                                        </artifactItem>
+                                        <artifactItem>
+                                            <groupId>org.slf4j</groupId>
+                                            
<artifactId>slf4j-log4j12</artifactId>
+                                            <version>${slf4j.version}</version>
+                                        </artifactItem>
+                                    </artifactItems>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>
diff --git a/tools/notification-analyzer/scripts/notification-analyzer.sh 
b/tools/notification-analyzer/scripts/notification-analyzer.sh
new file mode 100644
index 000000000..179336bd0
--- /dev/null
+++ b/tools/notification-analyzer/scripts/notification-analyzer.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License. See accompanying LICENSE file.
+#
+# resolve links - $0 may be a softlink
+
+M2_REPO=~/.m2/repository
+
+LIB_DIR=$(pwd)
+LOGFILE_DIR="${LOGFILE_DIR:-/tmp/}"
+LOGFILE_NAME="${LOGFILE_NAME:-atlas-notification-analyzer.log}"
+
+CP=.
+for i in "${LIB_DIR}/"*.jar; do
+  CP="${CP}:$i"
+done
+
+java -cp ${CP} -Dlog4j.configuration=atlas-log4j.xml 
-Datlas.log.dir=${LOGFILE_DIR} -Datlas.log.file=${LOGFILE_NAME}  
org.apache.atlas.tools.NotificationAnalyzer $*
diff --git 
a/tools/notification-analyzer/src/main/java/org/apache/atlas/tools/NotificationAnalyzer.java
 
b/tools/notification-analyzer/src/main/java/org/apache/atlas/tools/NotificationAnalyzer.java
new file mode 100644
index 000000000..20b106546
--- /dev/null
+++ 
b/tools/notification-analyzer/src/main/java/org/apache/atlas/tools/NotificationAnalyzer.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.tools;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.AtlasNotificationMessageDeserializer;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
+import 
org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import 
org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
+import 
org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
+import 
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NotificationAnalyzer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NotificationAnalyzer.class);
+
+    private final String                                  msgFile;
+    private final String                                  outputFile;
+    private final AtlasNotificationMessageDeserializer    deserializer;
+    private final Map<String, AtomicInteger>              
notificationCountByType = new HashMap<>();
+    private final AtomicInteger                           entityCount          
   = new AtomicInteger();
+    private final Map<String, AtomicInteger>              entityCountByType    
   = new HashMap<>();
+    private final Map<String, AtomicInteger>              entityOperCount      
   = new HashMap<>();
+    private final Map<String, Map<String, AtomicInteger>> 
entityOperByTypeCount   = new HashMap<>();
+    private final Set<String>                             knownEntities        
   = new HashSet<>();
+    private final IntSummaryStatistics                    notificationStats    
   = new IntSummaryStatistics();
+    private final IntSummaryStatistics                    
splitNotificationStats  = new IntSummaryStatistics();
+
+    public static void main(String[] args) {
+        CommandLineParser parser  = new BasicParser();
+        Options           options = new Options();
+
+        options.addOption("m", "message-file", true, "Messages file");
+        options.addOption("o", "output-file", true, "Output file");
+
+        try {
+            CommandLine cmd     = parser.parse(options, args);
+            String      msgFile = cmd.getOptionValue("m");
+            String      outFile = cmd.getOptionValue("o");
+
+            if (msgFile == null || msgFile.isEmpty()) {
+                msgFile = "ATLAS_HOOK.json";
+            }
+
+            NotificationAnalyzer analyzer = new NotificationAnalyzer(msgFile, 
outFile, NotificationType.HOOK);
+
+            analyzer.analyze();
+        } catch (Throwable t) {
+            throw new RuntimeException(t);
+        }
+    }
+
+    public NotificationAnalyzer(String msgFile, String outputFile, 
NotificationType notificationType) {
+        this.msgFile      = msgFile;
+        this.outputFile   = outputFile;
+        this.deserializer = notificationType.getDeserializer();
+    }
+
+    public void analyze() throws Exception {
+        long startTimeMs = System.currentTimeMillis();
+
+        try (BufferedReader reader = getInputReader(); PrintWriter writer = 
getOutputWriter()) {
+            int msgCount         = 0;
+            int notificationSize = 0;
+
+            for (String msg = reader.readLine(); msg != null; msg = 
reader.readLine()) {
+                msgCount++;
+                notificationSize += msg.length();
+
+                HookNotification notification = (HookNotification) 
deserializer.deserialize(msg);
+
+                if (notification == null) { // split notification, continue
+                    continue;
+                }
+
+                notificationStats.accept(notificationSize);
+
+                if (notificationSize > msg.length()) {
+                    splitNotificationStats.accept(notificationSize);
+                }
+
+                notificationSize = 0;
+
+                
notificationCountByType.computeIfAbsent(notification.getType().name(), e -> new 
AtomicInteger()).incrementAndGet();
+
+                switch (notification.getType()) {
+                    case ENTITY_CREATE:
+                        handleEntityCreate((EntityCreateRequest) notification);
+                        break;
+                    case ENTITY_PARTIAL_UPDATE:
+                        handleEntityPartialUpdate((EntityPartialUpdateRequest) 
notification);
+                        break;
+                    case ENTITY_FULL_UPDATE:
+                        handleEntityUpdate((EntityUpdateRequest) notification);
+                        break;
+                    case ENTITY_DELETE:
+                        handleEntityDelete((EntityDeleteRequest) notification);
+                        break;
+                    case ENTITY_CREATE_V2:
+                        handleEntityCreateV2((EntityCreateRequestV2) 
notification);
+                        break;
+                    case ENTITY_PARTIAL_UPDATE_V2:
+                        
handleEntityPartialUpdateV2((EntityPartialUpdateRequestV2) notification);
+                        break;
+                    case ENTITY_FULL_UPDATE_V2:
+                        handleEntityUpdateV2((EntityUpdateRequestV2) 
notification);
+                        break;
+                    case ENTITY_DELETE_V2:
+                        handleEntityDeleteV2((EntityDeleteRequestV2) 
notification);
+                        break;
+                }
+
+                if ((notificationStats.getCount() % 1000) == 0) {
+                    LOG.info("PROGRESS #{}: analyzed {} notifications, {} 
messages", (msgCount / 1000), notificationStats.getCount(), msgCount);
+                    writer.printf("PROGRESS #%1$d: analyzed %2$d 
notifications, %3$d messages%n", (msgCount / 1000), 
notificationStats.getCount(), msgCount);
+                    writer.flush();
+                }
+            }
+
+            long timeTakenSeconds = (System.currentTimeMillis() - startTimeMs) 
/ 1000;
+
+            LOG.info("Completed analyzing {}. Time taken: {} seconds", 
msgFile, timeTakenSeconds);
+            writer.printf("Completed analyzing %1$s. Time taken: %2$d 
seconds%n", msgFile, timeTakenSeconds);
+            writer.flush();
+
+            Map<String, Object> results = new LinkedHashMap<>();
+
+            results.put("messages", msgCount);
+            results.put("notifications", notificationStats.getCount());
+            results.put("notificationLengthAvg", (int) 
notificationStats.getAverage());
+            results.put("notificationLengthMax", notificationStats.getMax());
+            results.put("splitNotifications", 
splitNotificationStats.getCount());
+            results.put("splitNotificationLengthAvg", (int) 
splitNotificationStats.getAverage());
+            results.put("splitNotificationLengthMax", 
splitNotificationStats.getMax());
+            results.put("entities", entityCount);
+            results.put("notificationEntities", 
entityCountByType.values().stream().mapToInt(AtomicInteger::get).sum());
+            results.put("notificationByType", notificationCountByType);
+            results.put("notificationEntityByType", entityCountByType);
+            results.put("entityOperations", entityOperCount);
+            results.put("entityOperationsByType", entityOperByTypeCount);
+
+            String msg = AtlasJson.toJson(results);
+
+            LOG.info(msg);
+            writer.println(msg);
+            writer.flush();
+        }
+    }
+
+    private void handleEntityCreate(EntityCreateRequest request) {
+        if (request.getEntities() != null) {
+            for (Referenceable entity : request.getEntities()) {
+                recordEntity(entity);
+            }
+        }
+    }
+
+    private void handleEntityUpdate(EntityUpdateRequest request) {
+        if (request.getEntities() != null) {
+            for (Referenceable entity : request.getEntities()) {
+                recordEntity(entity);
+            }
+        }
+    }
+
+    private void handleEntityPartialUpdate(EntityPartialUpdateRequest request) 
{
+        recordEntityOperation(request.getTypeName(), "PARTIAL_UPDATE");
+    }
+
+    private void handleEntityDelete(EntityDeleteRequest request) {
+        knownEntities.remove(getEntityKey(request));
+
+        recordEntityOperation(request.getTypeName(), "DELETE");
+    }
+
+    private void handleEntityCreateV2(EntityCreateRequestV2 request) {
+        if (request.getEntities() != null) {
+            if (request.getEntities().getEntities() != null) {
+                for (AtlasEntity entity : request.getEntities().getEntities()) 
{
+                    recordEntity(entity);
+                }
+            }
+
+            if (request.getEntities().getReferredEntities() != null) {
+                for (AtlasEntity entity : 
request.getEntities().getReferredEntities().values()) {
+                    recordEntity(entity);
+                }
+            }
+        }
+    }
+
+    private void handleEntityUpdateV2(EntityUpdateRequestV2 request) {
+        if (request.getEntities() != null) {
+            if (request.getEntities().getEntities() != null) {
+                for (AtlasEntity entity : request.getEntities().getEntities()) 
{
+                    recordEntity(entity);
+                }
+            }
+
+            if (request.getEntities().getReferredEntities() != null) {
+                for (AtlasEntity entity : 
request.getEntities().getReferredEntities().values()) {
+                    recordEntity(entity);
+                }
+            }
+        }
+    }
+
+    private void handleEntityPartialUpdateV2(EntityPartialUpdateRequestV2 
request) {
+        if (request.getEntity() != null && request.getEntity().getEntity() != 
null) {
+            AtlasEntity entity = request.getEntity().getEntity();
+
+            recordEntityOperation(entity.getTypeName(), "PARTIAL_UPDATE");
+        }
+    }
+
+    private void handleEntityDeleteV2(EntityDeleteRequestV2 request) {
+        if (request.getEntities() != null) {
+            for (AtlasObjectId objId : request.getEntities()) {
+                knownEntities.remove(getEntityKey(objId));
+
+                recordEntityOperation(objId.getTypeName(), "DELETE");
+            }
+        }
+    }
+
+    private void recordEntity(AtlasEntity entity) {
+        final String operation;
+
+        if (knownEntities.add(getEntityKey(entity))) {
+            operation = "CREATE";
+        } else {
+            operation = "UPDATE";
+        }
+
+        recordEntityOperation(entity.getTypeName(), operation);
+    }
+
+    private void recordEntity(Referenceable entity) {
+        final String operation;
+
+        if (knownEntities.add(getEntityKey(entity))) {
+            operation = "CREATE";
+        } else {
+            operation = "UPDATE";
+        }
+
+        recordEntityOperation(entity.getTypeName(), operation);
+    }
+
+    private void recordEntityOperation(String entityTypeName, String 
operation) {
+        if (operation.equals("CREATE")) {
+            entityCount.incrementAndGet();
+        }
+
+        entityCountByType.computeIfAbsent(entityTypeName, c -> new 
AtomicInteger()).incrementAndGet();
+        entityOperCount.computeIfAbsent(operation, c -> new 
AtomicInteger()).incrementAndGet();
+        entityOperByTypeCount.computeIfAbsent(operation, c -> new 
TreeMap<>()).computeIfAbsent(entityTypeName, c -> new 
AtomicInteger()).incrementAndGet();
+    }
+
+    private String getEntityKey(AtlasEntity entity) {
+        return entity.getTypeName() + ":" + 
getUniqueKey(entity.getAttributes());
+    }
+
+    private String getEntityKey(AtlasObjectId objectId) {
+        return objectId.getTypeName() + ":" + 
getUniqueKey(objectId.getUniqueAttributes());
+    }
+
+    private String getEntityKey(Referenceable entity) {
+        return entity.getTypeName() + ":" + getUniqueKey(entity.getValues());
+    }
+
+    private String getEntityKey(EntityDeleteRequest request) {
+        return request.getTypeName() + ":" + request.getAttributeValue();
+    }
+
+    private Object getUniqueKey(Map<String, Object> attributes) {
+        Object ret = attributes.get("qualifiedName");
+
+        return ret == null ? attributes.get("name") : ret;
+    }
+
+    private BufferedReader getInputReader() throws IOException {
+        return new BufferedReader(new FileReader(msgFile));
+    }
+
+    private PrintWriter getOutputWriter() throws IOException {
+        return (outputFile == null || outputFile.isEmpty()) ? new 
PrintWriter(System.out) : new PrintWriter(new FileWriter(outputFile, true));
+    }
+}
diff --git 
a/tools/notification-analyzer/src/main/resources/atlas-application.properties 
b/tools/notification-analyzer/src/main/resources/atlas-application.properties
new file mode 100644
index 000000000..01d705720
--- /dev/null
+++ 
b/tools/notification-analyzer/src/main/resources/atlas-application.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
diff --git a/tools/notification-analyzer/src/main/resources/atlas-log4j.xml 
b/tools/notification-analyzer/src/main/resources/atlas-log4j.xml
new file mode 100755
index 000000000..0f9182f36
--- /dev/null
+++ b/tools/notification-analyzer/src/main/resources/atlas-log4j.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m 
(%C{1}:%L)%n"/>
+        </layout>
+    </appender>
+
+    <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
+        <param name="Append" value="true"/>
+        <param name="maxFileSize" value="100MB" />
+        <param name="maxBackupIndex" value="20" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m 
(%C{1}:%L)%n"/>
+        </layout>
+    </appender>
+
+    <root>
+        <priority value="info"/>
+        <appender-ref ref="FILE"/>
+    </root>
+</log4j:configuration>


Reply via email to