# IGNITE-274 Reworked events collecting.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/677e643b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/677e643b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/677e643b

Branch: refs/heads/ignite-223
Commit: 677e643b217ecbc92c081711a688fde087e42949
Parents: 659b432
Author: AKuznetsov <akuznet...@gridgain.com>
Authored: Tue Feb 17 14:24:28 2015 +0700
Committer: AKuznetsov <akuznet...@gridgain.com>
Committed: Tue Feb 17 14:24:28 2015 +0700

----------------------------------------------------------------------
 .../visor/node/VisorNodeDataCollectorJob.java   | 50 +++++++++++---
 .../internal/visor/util/VisorEventMapper.java   | 73 ++++++++++++++++++++
 .../internal/visor/util/VisorTaskUtils.java     | 60 ++++++++--------
 3 files changed, 142 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/677e643b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index 2bedf10..5cb9039 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -55,8 +55,26 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
         super(arg, debug);
     }
 
-    /** Collect events. */
-    private void events(VisorNodeDataCollectorJobResult res, 
VisorNodeDataCollectorTaskArg arg) {
+    /**
+     * Collect events.
+     *
+     * @param res Job result.
+     * @param evtOrderKey Unique key to take last order key from node local 
map.
+     * @param evtThrottleCntrKey  Unique key to take throttle count from node 
local map.
+     * @param all If {@code true} then collect all events otherwise collect 
only non task events.
+     */
+    protected void events0(VisorNodeDataCollectorJobResult res, String 
evtOrderKey, String evtThrottleCntrKey,
+        final boolean all) {
+        res.events().addAll(collectEvents(ignite, evtOrderKey, 
evtThrottleCntrKey, all));
+    }
+
+    /**
+     * Collect events.
+     *
+     * @param res Job result.
+     * @param arg Task argument.
+     */
+    protected void events(VisorNodeDataCollectorJobResult res, 
VisorNodeDataCollectorTaskArg arg) {
         try {
             // Visor events explicitly enabled in configuration.
             if (checkExplicitTaskMonitoring(ignite))
@@ -86,16 +104,20 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
                 }
             }
 
-            res.events().addAll(collectEvents(ignite, arg.eventsOrderKey(), 
arg.eventsThrottleCounterKey(),
-                arg.taskMonitoringEnabled()));
+            events0(res, arg.eventsOrderKey(), arg.eventsThrottleCounterKey(), 
arg.taskMonitoringEnabled());
         }
         catch (Throwable eventsEx) {
             res.eventsEx(eventsEx);
         }
     }
 
-    /** Collect caches. */
-    private void caches(VisorNodeDataCollectorJobResult res, 
VisorNodeDataCollectorTaskArg arg) {
+    /**
+     * Collect caches.
+     *
+     * @param res Job result.
+     * @param arg Task argument.
+     */
+    protected void caches(VisorNodeDataCollectorJobResult res, 
VisorNodeDataCollectorTaskArg arg) {
         try {
             IgniteConfiguration cfg = ignite.configuration();
 
@@ -120,8 +142,12 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
         }
     }
 
-    /** Collect IGFS. */
-    private void igfs(VisorNodeDataCollectorJobResult res) {
+    /**
+     * Collect IGFSs.
+     *
+     * @param res Job result.
+     */
+    protected void igfs(VisorNodeDataCollectorJobResult res) {
         try {
             IgfsProcessorAdapter igfsProc = 
((IgniteKernal)ignite).context().igfs();
 
@@ -151,8 +177,12 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
         }
     }
 
-    /** Collect streamers. */
-    private void streamers(VisorNodeDataCollectorJobResult res) {
+    /**
+     * Collect streamers.
+     *
+     * @param res Job result.
+     */
+    protected void streamers(VisorNodeDataCollectorJobResult res) {
         try {
             StreamerConfiguration[] cfgs = 
ignite.configuration().getStreamerConfiguration();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/677e643b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java
new file mode 100644
index 0000000..574d4ff
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.util;
+
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.visor.event.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Mapper from grid event to Visor data transfer object.
+ */
+public class VisorEventMapper implements IgniteClosure<Event, VisorGridEvent> {
+    /**
+     * Map grid event to Visor data transfer object.
+     *
+     * @param evt Grid event.
+     * @param type Event's type.
+     * @param id Event id.
+     * @param name Event name.
+     * @param nid Event node ID.
+     * @param ts Event timestamp.
+     * @param msg Event message.
+     * @param shortDisplay Shortened version of {@code toString()} result.
+     * @return Visor data transfer object for event.
+     */
+    protected VisorGridEvent map(Event evt, int type, IgniteUuid id, String 
name, UUID nid, long ts, String msg,
+        String shortDisplay) {
+        if (evt instanceof TaskEvent) {
+            TaskEvent te = (TaskEvent)evt;
+
+            return new VisorGridTaskEvent(type, id, name, nid, ts, msg, 
shortDisplay,
+                te.taskName(), te.taskClassName(), te.taskSessionId(), 
te.internal());
+        }
+
+        if (evt instanceof JobEvent) {
+            JobEvent je = (JobEvent)evt;
+
+            return new VisorGridJobEvent(type, id, name, nid, ts, msg, 
shortDisplay,
+                je.taskName(), je.taskClassName(), je.taskSessionId(), 
je.jobId());
+        }
+
+        if (evt instanceof DeploymentEvent) {
+            DeploymentEvent de = (DeploymentEvent)evt;
+
+            return new VisorGridDeploymentEvent(type, id, name, nid, ts, msg, 
shortDisplay, de.alias());
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public VisorGridEvent apply(Event evt) {
+        return map(evt, evt.type(), evt.id(), evt.name(), evt.node().id(), 
evt.timestamp(), evt.message(),
+            evt.shortDisplay());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/677e643b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index 728b569..4be371c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@ -83,13 +83,13 @@ public class VisorTaskUtils {
     };
 
     /** Only non task event types that Visor should collect. */
-    private static final int[] VISOR_NON_TASK_EVTS = {
+    public static final int[] VISOR_NON_TASK_EVTS = {
         EVT_CLASS_DEPLOY_FAILED,
         EVT_TASK_DEPLOY_FAILED
     };
 
     /** Only non task event types that Visor should collect. */
-    private static final int[] VISOR_ALL_EVTS = concat(VISOR_TASK_EVTS, 
VISOR_NON_TASK_EVTS);
+    public static final int[] VISOR_ALL_EVTS = concat(VISOR_TASK_EVTS, 
VISOR_NON_TASK_EVTS);
 
     /** Maximum folder depth. I.e. if depth is 4 we look in starting folder 
and 3 levels of sub-folders. */
     public static final int MAX_FOLDER_DEPTH = 4;
@@ -321,13 +321,16 @@ public class VisorTaskUtils {
         return true;
     }
 
-    /** */
-    private static final Comparator<Event> EVENTS_ORDER_COMPARATOR = new 
Comparator<Event>() {
+    /** Events comparator by event local order. */
+    private static final Comparator<Event> EVTS_ORDER_COMPARATOR = new 
Comparator<Event>() {
         @Override public int compare(Event o1, Event o2) {
             return Long.compare(o1.localOrder(), o2.localOrder());
         }
     };
 
+    /** Mapper from grid event to Visor data transfer object. */
+    private static final VisorEventMapper EVT_MAPPER = new VisorEventMapper();
+
     /**
      * Grabs local events and detects if events was lost since last poll.
      *
@@ -339,7 +342,24 @@ public class VisorTaskUtils {
      */
     public static Collection<VisorGridEvent> collectEvents(Ignite ignite, 
String evtOrderKey, String evtThrottleCntrKey,
         final boolean all) {
+        return collectEvents(ignite, evtOrderKey, evtThrottleCntrKey, all ? 
VISOR_ALL_EVTS : VISOR_NON_TASK_EVTS,
+            EVT_MAPPER);
+    }
+
+    /**
+     * Grabs local events and detects if events was lost since last poll.
+     *
+     * @param ignite Target grid.
+     * @param evtOrderKey Unique key to take last order key from node local 
map.
+     * @param evtThrottleCntrKey  Unique key to take throttle count from node 
local map.
+     * @param evtTypes Event types to collect.
+     * @param evtMapper Closure to map grid events to Visor data transfer 
objects.
+     * @return Collections of node events
+     */
+    public static Collection<VisorGridEvent> collectEvents(Ignite ignite, 
String evtOrderKey, String evtThrottleCntrKey,
+        final int[] evtTypes, IgniteClosure<Event, VisorGridEvent> evtMapper) {
         assert ignite != null;
+        assert evtTypes != null && evtTypes.length > 0;
 
         ClusterNodeLocalMap<String, Long> nl = ignite.cluster().nodeLocalMap();
 
@@ -361,8 +381,7 @@ public class VisorTaskUtils {
                     lastFound.set(true);
 
                 // Retains events by lastOrder, period and type.
-                return e.localOrder() > lastOrder && e.timestamp() > 
notOlderThan &&
-                    (all ? F.contains(VISOR_ALL_EVTS, e.type()) : 
F.contains(VISOR_NON_TASK_EVTS, e.type()));
+                return e.localOrder() > lastOrder && e.timestamp() > 
notOlderThan && F.contains(evtTypes, e.type());
             }
         };
 
@@ -370,7 +389,7 @@ public class VisorTaskUtils {
 
         // Update latest order in node local, if not empty.
         if (!evts.isEmpty()) {
-            Event maxEvt = Collections.max(evts, EVENTS_ORDER_COMPARATOR);
+            Event maxEvt = Collections.max(evts, EVTS_ORDER_COMPARATOR);
 
             nl.put(evtOrderKey, maxEvt.localOrder());
         }
@@ -387,31 +406,10 @@ public class VisorTaskUtils {
             res.add(new 
VisorGridEventsLost(ignite.cluster().localNode().id()));
 
         for (Event e : evts) {
-            int tid = e.type();
-            IgniteUuid id = e.id();
-            String name = e.name();
-            UUID nid = e.node().id();
-            long t = e.timestamp();
-            String msg = e.message();
-            String shortDisplay = e.shortDisplay();
-
-            if (e instanceof TaskEvent) {
-                TaskEvent te = (TaskEvent)e;
-
-                res.add(new VisorGridTaskEvent(tid, id, name, nid, t, msg, 
shortDisplay,
-                    te.taskName(), te.taskClassName(), te.taskSessionId(), 
te.internal()));
-            }
-            else if (e instanceof JobEvent) {
-                JobEvent je = (JobEvent)e;
-
-                res.add(new VisorGridJobEvent(tid, id, name, nid, t, msg, 
shortDisplay,
-                    je.taskName(), je.taskClassName(), je.taskSessionId(), 
je.jobId()));
-            }
-            else if (e instanceof DeploymentEvent) {
-                DeploymentEvent de = (DeploymentEvent)e;
+            VisorGridEvent visorEvt = evtMapper.apply(e);
 
-                res.add(new VisorGridDeploymentEvent(tid, id, name, nid, t, 
msg, shortDisplay, de.alias()));
-            }
+            if (visorEvt != null)
+                res.add(visorEvt);
         }
 
         return res;

Reply via email to