# ignite-690 WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cd75a764 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cd75a764 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cd75a764 Branch: refs/heads/ignite-sprint-4 Commit: cd75a764621e26b4d8acee4473ea2358f6bdf0c2 Parents: 7d28161 Author: Andrey <anovi...@gridgain.com> Authored: Thu Apr 16 11:10:36 2015 +0700 Committer: Andrey <anovi...@gridgain.com> Committed: Thu Apr 16 11:10:36 2015 +0700 ---------------------------------------------------------------------- .../commands/cache/VisorCacheClearCommand.scala | 13 ++------- .../commands/cache/VisorCacheScanCommand.scala | 29 ++++---------------- .../commands/cache/VisorCacheStopCommand.scala | 22 +-------------- .../commands/cache/VisorCacheSwapCommand.scala | 15 ++++------ .../commands/disco/VisorDiscoveryCommand.scala | 12 ++++---- .../commands/events/VisorEventsCommand.scala | 4 +-- .../visor/commands/gc/VisorGcCommand.scala | 21 ++++++-------- .../commands/tasks/VisorTasksCommand.scala | 28 +++++++------------ .../scala/org/apache/ignite/visor/visor.scala | 19 +++++++++---- 9 files changed, 55 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd75a764/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala index 8107f1f..ebdaa34 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala @@ -100,17 +100,10 @@ class VisorCacheClearCommand { case Some(name) => name } - val prj = node.fold(ignite.cluster.forCacheNodes(cacheName))(ignite.cluster.forNode(_)) + val prj = node.fold(ignite.cluster.forRandom())(ignite.cluster.forNode(_)) - if (prj.nodes().isEmpty) { - val msg = - if (cacheName == null) - "Can't find nodes with default cache." - else - "Can't find nodes with specified cache: " + cacheName - - scold(msg).^^ - } + if (prj.nodes().isEmpty) + scold(node.fold("Topology is empty.")(n => "Can't find node with specified id: " + n.id())).^^ val t = VisorTextTable() http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd75a764/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala index 3490669..405a4d7 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala @@ -136,31 +136,17 @@ class VisorCacheScanCommand { case Some(name) => name } - val cachePrj = node match { - case Some(n) => ignite.cluster.forNode(n).forCacheNodes(cacheName) - case _ => ignite.cluster.forCacheNodes(cacheName) - } + val n = node.fold(ignite.cluster.forRandom())(ignite.cluster.forNode(_)).node() - if (cachePrj.nodes().isEmpty) { - warn("Can't find nodes with specified cache: " + cacheName, - "Type 'cache' to see available cache names." - ) + if (n == null) { + scold(node.fold("Topology is empty.")(n => "Can't find node with specified id: " + n.id())).^^ return } - val qryPrj = cachePrj.forRandom() - val proj = new java.util.HashSet(cachePrj.nodes().map(_.id())) - - val nid = qryPrj.node().id() - val fullRes = try - ignite.compute(qryPrj) - .withName("visor-cscan-task") - .withNoFailover() - .execute(classOf[VisorQueryTask], - toTaskArgument(nid, new VisorQueryArg(null, cacheName, "SCAN", pageSize))) + executeOne(n.id(), classOf[VisorQueryTask], new VisorQueryArg(null, cacheName, "SCAN", pageSize)) match { case x if x.get1() != null => error(x.get1()) @@ -201,11 +187,8 @@ class VisorCacheScanCommand { ask("\nFetch more objects (y/n) [y]:", "y") match { case "y" | "Y" => try { - res = ignite.compute(qryPrj) - .withName("visor-cscan-fetch-task") - .withNoFailover() - .execute(classOf[VisorQueryNextPageTask], - toTaskArgument(nid, new IgniteBiTuple[String, Integer](fullRes.queryId(), pageSize))) + res = executeOne(n.id(), classOf[VisorQueryNextPageTask], + new IgniteBiTuple[String, Integer](fullRes.queryId(), pageSize)) render() } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd75a764/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala index cf344e6..690ee13 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala @@ -92,30 +92,10 @@ class VisorCacheStopCommand { case Some(name) => name } - val cachePrj = node match { - case Some(n) => ignite.cluster.forNode(n).forCacheNodes(cacheName) - case _ => ignite.cluster.forCacheNodes(cacheName) - } - - if (cachePrj.nodes().isEmpty) { - warn("Can't find nodes with specified cache: " + escapeName(cacheName), - "Type 'cache' to see available cache names." - ) - - return - } - ask(s"Are you sure you want to stop cache: ${escapeName(cacheName)}? (y/n) [n]: ", "n") match { case "y" | "Y" => - val stopPrj = cachePrj.forRandom() - - val nid = stopPrj.node().id() - try { - ignite.compute(stopPrj) - .withName("visor-cstop-task") - .withNoFailover() - .execute(classOf[VisorCacheStopTask], toTaskArgument(nid, cacheName)) + executeRandom(classOf[VisorCacheStopTask], cacheName) println("Visor successfully stop cache: " + escapeName(cacheName)) } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd75a764/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala index b946fc3..0a20166 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala @@ -103,7 +103,8 @@ class VisorCacheSwapCommand { case Some(name) => name } - val prj = if (node.isDefined) ignite.cluster.forNode(node.get) else ignite.cluster.forCacheNodes(cacheName) + + val prj = node.fold(ignite.cluster.forRandom())(ignite.cluster.forNode(_)) if (prj.nodes().isEmpty) { val msg = @@ -119,17 +120,11 @@ class VisorCacheSwapCommand { t #= ("Node ID8(@)", "Entries Swapped", "Cache Size Before", "Cache Size After") - val cacheSet = Collections.singleton(cacheName) + val nid = prj.node().id() - prj.nodes().foreach(node => { - val r = ignite.compute(ignite.cluster.forNode(node)) - .withName("visor-cswap-task") - .withNoFailover() - .execute(classOf[VisorCacheSwapBackupsTask], toTaskArgument(node.id(), cacheSet)) - .get(cacheName) + val r = executeOne(nid, classOf[VisorCacheSwapBackupsTask], Collections.singleton(cacheName)).get(cacheName) - t += (nodeId8(node.id()), r.get1() - r.get2(), r.get1(), r.get2()) - }) + t += (nodeId8(nid), r.get1() - r.get2(), r.get1(), r.get2()) println("Swapped entries in cache: " + escapeName(cacheName)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd75a764/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala index 3b6ca7e..3dd3784 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala @@ -21,6 +21,8 @@ import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.events.EventType._ import org.apache.ignite.internal.util.lang.{GridFunc => F} import org.apache.ignite.internal.util.{IgniteUtils => U} +import java.util.UUID + import org.apache.ignite.internal.visor.event.VisorGridDiscoveryEvent import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask.VisorNodeEventsCollectorTaskArg @@ -143,7 +145,7 @@ class VisorDiscoveryCommand { return } - val oldest = ignite.cluster.nodes().maxBy(_.metrics().getUpTime) + val node = ignite.cluster.forOldest().node() val cntOpt = argValue("c", argLst) @@ -157,11 +159,11 @@ class VisorDiscoveryCommand { return } - println("Oldest alive node in grid: " + nodeId8Addr(oldest.id)) + println("Oldest alive node in grid: " + nodeId8Addr(node.id())) val evts = try - events(oldest, tm, hasArgFlag("r", argLst)) + events(node, tm, hasArgFlag("r", argLst)) catch { case e: Throwable => scold(e.getMessage) @@ -217,8 +219,8 @@ class VisorDiscoveryCommand { assert(node != null) assert(!node.isDaemon) - var evts = ignite.compute(ignite.cluster.forNode(node)).execute(classOf[VisorNodeEventsCollectorTask], - toTaskArgument(node.id(), VisorNodeEventsCollectorTaskArg.createEventsArg(EVTS_DISCOVERY, tmFrame))).toSeq + var evts = executeOne(node.id(), classOf[VisorNodeEventsCollectorTask], + VisorNodeEventsCollectorTaskArg.createEventsArg(EVTS_DISCOVERY, tmFrame)).toSeq val nodeStartTime = node.metrics().getStartTime http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd75a764/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/events/VisorEventsCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/events/VisorEventsCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/events/VisorEventsCommand.scala index 01d8b14..ad507c8 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/events/VisorEventsCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/events/VisorEventsCommand.scala @@ -283,8 +283,8 @@ class VisorEventsCommand { } val evts = try - ignite.compute(ignite.cluster.forNode(node)).execute(classOf[VisorNodeEventsCollectorTask], - toTaskArgument(nid, VisorNodeEventsCollectorTaskArg.createEventsArg(tpFilter, tmFilter))) + executeOne(nid, classOf[VisorNodeEventsCollectorTask], + VisorNodeEventsCollectorTaskArg.createEventsArg(tpFilter, tmFilter)) catch { case e: IgniteException => scold(e.getMessage) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd75a764/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala index 5ec85a7..1c15528 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala @@ -141,24 +141,19 @@ class VisorGcCommand { t #= ("Node ID8(@)", "Free Heap Before", "Free Heap After", "Free Heap Delta") - val prj = ignite.cluster.forRemotes() - - val nids = prj.nodes().map(_.id()) - val NULL: Void = null - ignite.compute(prj).withNoFailover().execute(classOf[VisorNodeGcTask], - toTaskArgument(nids, NULL)).foreach { - case (nid, stat) => - val roundHb = stat.get1() / (1024L * 1024L) - val roundHa = stat.get2() / (1024L * 1024L) + executeRemotes(classOf[VisorNodeGcTask], NULL).foreach { + case (nid, stat) => + val roundHb = stat.get1() / (1024L * 1024L) + val roundHa = stat.get2() / (1024L * 1024L) - val sign = if (roundHa > roundHb) "+" else "" + val sign = if (roundHa > roundHb) "+" else "" - val deltaPercent = math.round(roundHa * 100d / roundHb - 100) + val deltaPercent = math.round(roundHa * 100d / roundHb - 100) - t += (nodeId8(nid), roundHb + "mb", roundHa + "mb", sign + deltaPercent + "%") - } + t += (nodeId8(nid), roundHb + "mb", roundHa + "mb", sign + deltaPercent + "%") + } println("Garbage collector procedure results:") http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd75a764/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala index 5feae8c..515864d 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala @@ -613,10 +613,8 @@ class VisorTasksCommand { private def list(p: Long, taskName: String, reverse: Boolean, all: Boolean) { breakable { try { - val prj = ignite.cluster.forRemotes() - - val evts = ignite.compute(prj).execute(classOf[VisorNodeEventsCollectorTask], - toTaskArgument(prj.nodes.map(_.id()), VisorNodeEventsCollectorTaskArg.createTasksArg(p, taskName, null))) + val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + VisorNodeEventsCollectorTaskArg.createTasksArg(p, taskName, null)) val (tLst, eLst) = mkData(evts) @@ -819,8 +817,8 @@ class VisorTasksCommand { try { val prj = ignite.cluster.forRemotes() - val evts = ignite.compute(prj).execute(classOf[VisorNodeEventsCollectorTask], toTaskArgument(prj.nodes.map(_.id()), - VisorNodeEventsCollectorTaskArg.createTasksArg(null, taskName, null))) + val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + VisorNodeEventsCollectorTaskArg.createTasksArg(null, taskName, null)) val (tLst, eLst) = mkData(evts) @@ -990,10 +988,8 @@ class VisorTasksCommand { } try { - val prj = ignite.cluster.forRemotes() - - val evts = ignite.compute(prj).execute(classOf[VisorNodeEventsCollectorTask], toTaskArgument(prj.nodes.map(_.id()), - VisorNodeEventsCollectorTaskArg.createTasksArg(null, null, uuid))) + val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + VisorNodeEventsCollectorTaskArg.createTasksArg(null, null, uuid)) val (tLst, eLst) = mkData(evts) @@ -1104,10 +1100,8 @@ class VisorTasksCommand { private def nodes(f: Long) { breakable { try { - val prj = ignite.cluster.forRemotes() - - val evts = ignite.compute(prj).execute(classOf[VisorNodeEventsCollectorTask], toTaskArgument(prj.nodes.map(_.id()), - VisorNodeEventsCollectorTaskArg.createTasksArg(f, null, null))) + val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + VisorNodeEventsCollectorTaskArg.createTasksArg(f, null, null)) val eLst = mkData(evts)._2 @@ -1216,10 +1210,8 @@ class VisorTasksCommand { private def hosts(f: Long) { breakable { try { - val prj = ignite.cluster.forRemotes() - - val evts = ignite.compute(prj).execute(classOf[VisorNodeEventsCollectorTask], toTaskArgument(prj.nodes.map(_.id()), - VisorNodeEventsCollectorTaskArg.createTasksArg(f, null, null))) + val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + VisorNodeEventsCollectorTaskArg.createTasksArg(f, null, null)) val eLst = mkData(evts)._2 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd75a764/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala index 6be3f0e..431701a 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala @@ -19,7 +19,7 @@ package org.apache.ignite.visor import org.apache.ignite.IgniteSystemProperties._ import org.apache.ignite._ -import org.apache.ignite.cluster.{ClusterGroup, ClusterMetrics, ClusterNode} +import org.apache.ignite.cluster.{ClusterGroupEmptyException, ClusterGroup, ClusterMetrics, ClusterNode} import org.apache.ignite.configuration.IgniteConfiguration import org.apache.ignite.events.EventType._ import org.apache.ignite.events.{DiscoveryEvent, Event} @@ -1831,18 +1831,22 @@ object visor extends VisorTag { /** Convert to task argument. */ def emptyTaskArgument[A](nid: UUID): VisorTaskArgument[Void] = new VisorTaskArgument(nid, false) - def emptyTaskArgument[A](nids: Iterable[UUID]): VisorTaskArgument[Void] - = new VisorTaskArgument(new JavaHashSet(nids), false) + def emptyTaskArgument[A](nids: Iterable[UUID]): VisorTaskArgument[Void] = + new VisorTaskArgument(new JavaHashSet(nids), false) /** Convert to task argument. */ def toTaskArgument[A](nid: UUID, arg: A): VisorTaskArgument[A] = new VisorTaskArgument(nid, arg, false) /** Convert to task argument. */ - def toTaskArgument[A](nids: Iterable[UUID], arg: A): VisorTaskArgument[A] - = new VisorTaskArgument(new JavaHashSet(nids), arg, false) + def toTaskArgument[A](nids: Iterable[UUID], arg: A): VisorTaskArgument[A] = + new VisorTaskArgument(new JavaHashSet(nids), arg, false) + + private def execute[A, R, J](grp: ClusterGroup, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = { + if (grp.nodes().isEmpty) + throw new ClusterGroupEmptyException("Topology is empty.") - private def execute[A, R, J](grp: ClusterGroup, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = ignite.compute(grp).withNoFailover().execute(task, toTaskArgument(grp.nodes().map(_.id()), arg)) + } def executeOne[A, R, J](nid: UUID, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = execute(ignite.cluster.forNodeId(nid), task, arg) @@ -1859,6 +1863,9 @@ object visor extends VisorTag { def executeAll[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = execute(ignite.cluster, task, arg) + def executeRandom[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + execute(ignite.cluster.forRandom(), task, arg) + /** * Gets configuration from specified node. *