Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-4 2a68725ed -> 3a2e2d21e
# ignite-690 WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3a096e8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3a096e8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3a096e8c Branch: refs/heads/ignite-sprint-4 Commit: 3a096e8cd3ec19885bb4532bc7f1dd6d06f8da96 Parents: 0eadae9 Author: Andrey <anovi...@gridgain.com> Authored: Wed Apr 15 18:02:28 2015 +0700 Committer: Andrey <anovi...@gridgain.com> Committed: Wed Apr 15 18:02:28 2015 +0700 ---------------------------------------------------------------------- .../visor/commands/ack/VisorAckCommand.scala | 7 +--- .../commands/cache/VisorCacheClearCommand.scala | 7 +--- .../commands/cache/VisorCacheCommand.scala | 6 +-- .../scala/org/apache/ignite/visor/visor.scala | 41 +++++++++++++------- 4 files changed, 32 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a096e8c/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala index 7ef43f5..6fa96d0 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala @@ -105,12 +105,7 @@ class VisorAckCommand { adviseToConnect() else try { - val nodeIds = ignite.cluster().nodes().map(_.id()) - - ignite.compute(ignite.cluster().forNodeIds(nodeIds)) - .withName("visor-ack") - .withNoFailover() - .execute(classOf[VisorAckTask], toTaskArgument(nodeIds, msg)) + executeAll(classOf[VisorAckTask], msg) } catch { case _: ClusterGroupEmptyException => scold("Topology is empty.") http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a096e8c/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala index 379b566..8107f1f 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala @@ -100,7 +100,7 @@ class VisorCacheClearCommand { case Some(name) => name } - val prj = if (node.isDefined) ignite.cluster.forNode(node.get) else ignite.cluster.forCacheNodes(cacheName) + val prj = node.fold(ignite.cluster.forCacheNodes(cacheName))(ignite.cluster.forNode(_)) if (prj.nodes().isEmpty) { val msg = @@ -117,10 +117,7 @@ class VisorCacheClearCommand { t #= ("Node ID8(@)", "Cache Size Before", "Cache Size After") prj.nodes().headOption.foreach(node => { - val res = ignite.compute(ignite.cluster.forNode(node)) - .withName("visor-cclear-task") - .withNoFailover() - .execute(classOf[VisorCacheClearTask], toTaskArgument(node.id(), cacheName)) + val res = executeOne(node.id(), classOf[VisorCacheClearTask], cacheName) t += (nodeId8(node.id()), res.get1(), res.get2()) }) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a096e8c/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala index b8f2a35..f45597e 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala @@ -499,11 +499,11 @@ class VisorCacheCommand { val nids = prj.nodes().map(_.id()) - val caches: JavaCollection[String] = new JavaList[String]() + val caches: JavaCollection[String] = new JavaList[String](1) name.foreach(caches.add) - ignite.compute(prj).execute(classOf[VisorCacheMetricsCollectorTask], toTaskArgument(nids, - new IgniteBiTuple(JavaBoolean.valueOf(systemCaches), caches))).toList + executeMulti(nids, classOf[VisorCacheMetricsCollectorTask], + new IgniteBiTuple(JavaBoolean.valueOf(systemCaches), caches)).toList } catch { case e: IgniteException => Nil http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a096e8c/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala index b9981f1..6be3f0e 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala @@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.lang.{GridFunc => F} import org.apache.ignite.internal.util.typedef._ import org.apache.ignite.internal.util.{GridConfigurationFinder, IgniteUtils => U} import org.apache.ignite.logger.NullLogger -import org.apache.ignite.internal.visor.VisorTaskArgument +import org.apache.ignite.internal.visor.{VisorMultiNodeTask, VisorTaskArgument} import org.apache.ignite.internal.visor.cache._ import org.apache.ignite.internal.visor.node._ import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask.VisorNodeEventsCollectorTaskArg @@ -1841,7 +1841,23 @@ object visor extends VisorTag { def toTaskArgument[A](nids: Iterable[UUID], arg: A): VisorTaskArgument[A] = new VisorTaskArgument(new JavaHashSet(nids), arg, false) - def compute(nid: UUID): IgniteCompute = ignite.compute(ignite.cluster.forNodeId(nid)).withNoFailover() + private def execute[A, R, J](grp: ClusterGroup, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + ignite.compute(grp).withNoFailover().execute(task, toTaskArgument(grp.nodes().map(_.id()), arg)) + + def executeOne[A, R, J](nid: UUID, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + execute(ignite.cluster.forNodeId(nid), task, arg) + + def executeMulti[A, R, J](nids: Iterable[UUID], task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + execute(ignite.cluster.forNodeIds(nids), task, arg) + + def executeLocal[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + execute(ignite.cluster.forLocal(), task, arg) + + def executeRemotes[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + execute(ignite.cluster.forRemotes(), task, arg) + + def executeAll[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + execute(ignite.cluster, task, arg) /** * Gets configuration from specified node. @@ -1850,7 +1866,7 @@ object visor extends VisorTag { * @return Grid configuration. */ def nodeConfiguration(nid: UUID): VisorGridConfiguration = - compute(nid).execute(classOf[VisorNodeConfigurationCollectorTask], emptyTaskArgument(nid)) + executeOne(nid, classOf[VisorNodeConfigurationCollectorTask], null) /** * Gets caches configurations from specified node. @@ -1859,8 +1875,8 @@ object visor extends VisorTag { * @return Collection of cache configurations. */ def cacheConfigurations(nid: UUID): JavaCollection[VisorCacheConfiguration] = - compute(nid).execute(classOf[VisorCacheConfigurationCollectorTask], - toTaskArgument(nid, null.asInstanceOf[JavaCollection[IgniteUuid]])).values() + executeOne(nid, classOf[VisorCacheConfigurationCollectorTask], + null.asInstanceOf[JavaCollection[IgniteUuid]]).values() /** * Asks user to select a node from the list. @@ -2459,17 +2475,12 @@ object visor extends VisorTag { if (g != null) { try { // Discovery events collected only locally. - val loc = g.compute(g.cluster.forLocal()).withName("visor-log-collector").withNoFailover(). - execute(classOf[VisorNodeEventsCollectorTask], toTaskArgument(g.localNode().id(), - VisorNodeEventsCollectorTaskArg.createLogArg(key, LOG_EVTS ++ EVTS_DISCOVERY))).toSeq - - val evts = if (!rmtLogDisabled) { - val prj = g.cluster.forRemotes() + val loc = executeLocal(classOf[VisorNodeEventsCollectorTask], + VisorNodeEventsCollectorTaskArg.createLogArg(key, LOG_EVTS ++ EVTS_DISCOVERY)).toSeq - loc ++ g.compute(prj).withName("visor-log-collector").withNoFailover(). - execute(classOf[VisorNodeEventsCollectorTask], toTaskArgument(prj.nodes().map(_.id()), - VisorNodeEventsCollectorTaskArg.createLogArg(key, LOG_EVTS))).toSeq - } + val evts = if (!rmtLogDisabled) + loc ++ executeRemotes(classOf[VisorNodeEventsCollectorTask], + VisorNodeEventsCollectorTaskArg.createLogArg(key, LOG_EVTS)).toSeq else loc