Repository: incubator-ignite Updated Branches: refs/heads/sprint-2 c7dad84fb -> f13970678
# IGNITE-274 Reworked events collecting. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/677e643b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/677e643b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/677e643b Branch: refs/heads/sprint-2 Commit: 677e643b217ecbc92c081711a688fde087e42949 Parents: 659b432 Author: AKuznetsov <akuznet...@gridgain.com> Authored: Tue Feb 17 14:24:28 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Tue Feb 17 14:24:28 2015 +0700 ---------------------------------------------------------------------- .../visor/node/VisorNodeDataCollectorJob.java | 50 +++++++++++--- .../internal/visor/util/VisorEventMapper.java | 73 ++++++++++++++++++++ .../internal/visor/util/VisorTaskUtils.java | 60 ++++++++-------- 3 files changed, 142 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/677e643b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index 2bedf10..5cb9039 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -55,8 +55,26 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa super(arg, debug); } - /** Collect events. */ - private void events(VisorNodeDataCollectorJobResult res, VisorNodeDataCollectorTaskArg arg) { + /** + * Collect events. + * + * @param res Job result. + * @param evtOrderKey Unique key to take last order key from node local map. + * @param evtThrottleCntrKey Unique key to take throttle count from node local map. + * @param all If {@code true} then collect all events otherwise collect only non task events. + */ + protected void events0(VisorNodeDataCollectorJobResult res, String evtOrderKey, String evtThrottleCntrKey, + final boolean all) { + res.events().addAll(collectEvents(ignite, evtOrderKey, evtThrottleCntrKey, all)); + } + + /** + * Collect events. + * + * @param res Job result. + * @param arg Task argument. + */ + protected void events(VisorNodeDataCollectorJobResult res, VisorNodeDataCollectorTaskArg arg) { try { // Visor events explicitly enabled in configuration. if (checkExplicitTaskMonitoring(ignite)) @@ -86,16 +104,20 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa } } - res.events().addAll(collectEvents(ignite, arg.eventsOrderKey(), arg.eventsThrottleCounterKey(), - arg.taskMonitoringEnabled())); + events0(res, arg.eventsOrderKey(), arg.eventsThrottleCounterKey(), arg.taskMonitoringEnabled()); } catch (Throwable eventsEx) { res.eventsEx(eventsEx); } } - /** Collect caches. */ - private void caches(VisorNodeDataCollectorJobResult res, VisorNodeDataCollectorTaskArg arg) { + /** + * Collect caches. + * + * @param res Job result. + * @param arg Task argument. + */ + protected void caches(VisorNodeDataCollectorJobResult res, VisorNodeDataCollectorTaskArg arg) { try { IgniteConfiguration cfg = ignite.configuration(); @@ -120,8 +142,12 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa } } - /** Collect IGFS. */ - private void igfs(VisorNodeDataCollectorJobResult res) { + /** + * Collect IGFSs. + * + * @param res Job result. + */ + protected void igfs(VisorNodeDataCollectorJobResult res) { try { IgfsProcessorAdapter igfsProc = ((IgniteKernal)ignite).context().igfs(); @@ -151,8 +177,12 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa } } - /** Collect streamers. */ - private void streamers(VisorNodeDataCollectorJobResult res) { + /** + * Collect streamers. + * + * @param res Job result. + */ + protected void streamers(VisorNodeDataCollectorJobResult res) { try { StreamerConfiguration[] cfgs = ignite.configuration().getStreamerConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/677e643b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java new file mode 100644 index 0000000..574d4ff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.util; + +import org.apache.ignite.events.*; +import org.apache.ignite.internal.visor.event.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Mapper from grid event to Visor data transfer object. + */ +public class VisorEventMapper implements IgniteClosure<Event, VisorGridEvent> { + /** + * Map grid event to Visor data transfer object. + * + * @param evt Grid event. + * @param type Event's type. + * @param id Event id. + * @param name Event name. + * @param nid Event node ID. + * @param ts Event timestamp. + * @param msg Event message. + * @param shortDisplay Shortened version of {@code toString()} result. + * @return Visor data transfer object for event. + */ + protected VisorGridEvent map(Event evt, int type, IgniteUuid id, String name, UUID nid, long ts, String msg, + String shortDisplay) { + if (evt instanceof TaskEvent) { + TaskEvent te = (TaskEvent)evt; + + return new VisorGridTaskEvent(type, id, name, nid, ts, msg, shortDisplay, + te.taskName(), te.taskClassName(), te.taskSessionId(), te.internal()); + } + + if (evt instanceof JobEvent) { + JobEvent je = (JobEvent)evt; + + return new VisorGridJobEvent(type, id, name, nid, ts, msg, shortDisplay, + je.taskName(), je.taskClassName(), je.taskSessionId(), je.jobId()); + } + + if (evt instanceof DeploymentEvent) { + DeploymentEvent de = (DeploymentEvent)evt; + + return new VisorGridDeploymentEvent(type, id, name, nid, ts, msg, shortDisplay, de.alias()); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public VisorGridEvent apply(Event evt) { + return map(evt, evt.type(), evt.id(), evt.name(), evt.node().id(), evt.timestamp(), evt.message(), + evt.shortDisplay()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/677e643b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java index 728b569..4be371c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java @@ -83,13 +83,13 @@ public class VisorTaskUtils { }; /** Only non task event types that Visor should collect. */ - private static final int[] VISOR_NON_TASK_EVTS = { + public static final int[] VISOR_NON_TASK_EVTS = { EVT_CLASS_DEPLOY_FAILED, EVT_TASK_DEPLOY_FAILED }; /** Only non task event types that Visor should collect. */ - private static final int[] VISOR_ALL_EVTS = concat(VISOR_TASK_EVTS, VISOR_NON_TASK_EVTS); + public static final int[] VISOR_ALL_EVTS = concat(VISOR_TASK_EVTS, VISOR_NON_TASK_EVTS); /** Maximum folder depth. I.e. if depth is 4 we look in starting folder and 3 levels of sub-folders. */ public static final int MAX_FOLDER_DEPTH = 4; @@ -321,13 +321,16 @@ public class VisorTaskUtils { return true; } - /** */ - private static final Comparator<Event> EVENTS_ORDER_COMPARATOR = new Comparator<Event>() { + /** Events comparator by event local order. */ + private static final Comparator<Event> EVTS_ORDER_COMPARATOR = new Comparator<Event>() { @Override public int compare(Event o1, Event o2) { return Long.compare(o1.localOrder(), o2.localOrder()); } }; + /** Mapper from grid event to Visor data transfer object. */ + private static final VisorEventMapper EVT_MAPPER = new VisorEventMapper(); + /** * Grabs local events and detects if events was lost since last poll. * @@ -339,7 +342,24 @@ public class VisorTaskUtils { */ public static Collection<VisorGridEvent> collectEvents(Ignite ignite, String evtOrderKey, String evtThrottleCntrKey, final boolean all) { + return collectEvents(ignite, evtOrderKey, evtThrottleCntrKey, all ? VISOR_ALL_EVTS : VISOR_NON_TASK_EVTS, + EVT_MAPPER); + } + + /** + * Grabs local events and detects if events was lost since last poll. + * + * @param ignite Target grid. + * @param evtOrderKey Unique key to take last order key from node local map. + * @param evtThrottleCntrKey Unique key to take throttle count from node local map. + * @param evtTypes Event types to collect. + * @param evtMapper Closure to map grid events to Visor data transfer objects. + * @return Collections of node events + */ + public static Collection<VisorGridEvent> collectEvents(Ignite ignite, String evtOrderKey, String evtThrottleCntrKey, + final int[] evtTypes, IgniteClosure<Event, VisorGridEvent> evtMapper) { assert ignite != null; + assert evtTypes != null && evtTypes.length > 0; ClusterNodeLocalMap<String, Long> nl = ignite.cluster().nodeLocalMap(); @@ -361,8 +381,7 @@ public class VisorTaskUtils { lastFound.set(true); // Retains events by lastOrder, period and type. - return e.localOrder() > lastOrder && e.timestamp() > notOlderThan && - (all ? F.contains(VISOR_ALL_EVTS, e.type()) : F.contains(VISOR_NON_TASK_EVTS, e.type())); + return e.localOrder() > lastOrder && e.timestamp() > notOlderThan && F.contains(evtTypes, e.type()); } }; @@ -370,7 +389,7 @@ public class VisorTaskUtils { // Update latest order in node local, if not empty. if (!evts.isEmpty()) { - Event maxEvt = Collections.max(evts, EVENTS_ORDER_COMPARATOR); + Event maxEvt = Collections.max(evts, EVTS_ORDER_COMPARATOR); nl.put(evtOrderKey, maxEvt.localOrder()); } @@ -387,31 +406,10 @@ public class VisorTaskUtils { res.add(new VisorGridEventsLost(ignite.cluster().localNode().id())); for (Event e : evts) { - int tid = e.type(); - IgniteUuid id = e.id(); - String name = e.name(); - UUID nid = e.node().id(); - long t = e.timestamp(); - String msg = e.message(); - String shortDisplay = e.shortDisplay(); - - if (e instanceof TaskEvent) { - TaskEvent te = (TaskEvent)e; - - res.add(new VisorGridTaskEvent(tid, id, name, nid, t, msg, shortDisplay, - te.taskName(), te.taskClassName(), te.taskSessionId(), te.internal())); - } - else if (e instanceof JobEvent) { - JobEvent je = (JobEvent)e; - - res.add(new VisorGridJobEvent(tid, id, name, nid, t, msg, shortDisplay, - je.taskName(), je.taskClassName(), je.taskSessionId(), je.jobId())); - } - else if (e instanceof DeploymentEvent) { - DeploymentEvent de = (DeploymentEvent)e; + VisorGridEvent visorEvt = evtMapper.apply(e); - res.add(new VisorGridDeploymentEvent(tid, id, name, nid, t, msg, shortDisplay, de.alias())); - } + if (visorEvt != null) + res.add(visorEvt); } return res;