# IGNITE-737. Rework Visor code to support ClusterGroup.forXXX(cacheName) on daemon node.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a91429b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a91429b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a91429b9 Branch: refs/heads/ignite-sprint-4 Commit: a91429b93677d6ca82b1d85388747b9aa75a91cf Parents: 0f7a7ff Author: AKuznetsov <akuznet...@gridgain.com> Authored: Fri Apr 17 10:44:23 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Fri Apr 17 10:44:23 2015 +0700 ---------------------------------------------------------------------- .../internal/visor/VisorTaskArgument.java | 2 - .../visor/cache/VisorCacheNodesTask.java | 74 ++++++++++++++++++++ .../internal/visor/query/VisorQueryArg.java | 17 +---- .../internal/visor/query/VisorQueryCursor.java | 1 - .../internal/visor/query/VisorQueryJob.java | 39 +---------- .../internal/visor/query/VisorQueryTask.java | 44 ------------ .../visor/commands/ack/VisorAckCommand.scala | 2 - .../commands/cache/VisorCacheClearCommand.scala | 37 +++++----- .../commands/cache/VisorCacheCommand.scala | 2 +- .../commands/cache/VisorCacheScanCommand.scala | 26 +++---- .../commands/cache/VisorCacheStopCommand.scala | 14 +++- .../commands/cache/VisorCacheSwapCommand.scala | 15 ++-- .../commands/disco/VisorDiscoveryCommand.scala | 2 - .../scala/org/apache/ignite/visor/visor.scala | 38 ++++++++-- 14 files changed, 162 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java index e029678..1a4e498 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.visor; -import org.apache.ignite.cluster.*; - import java.io.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNodesTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNodesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNodesTask.java new file mode 100644 index 0000000..7b8bca3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNodesTask.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.cache; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.visor.*; + +import java.util.*; + +/** + * Task that returns collection of cache data nodes IDs. + */ +@GridInternal +public class VisorCacheNodesTask extends VisorOneNodeTask<String, Collection<UUID>> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorCacheNodesJob job(String arg) { + return new VisorCacheNodesJob(arg, debug); + } + + /** + * Job that collects cluster group for specified cache. + */ + private static class VisorCacheNodesJob extends VisorJob<String, Collection<UUID>> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Create job. + * + * @param cacheName Cache name to clear. + * @param debug Debug flag. + */ + private VisorCacheNodesJob(String cacheName, boolean debug) { + super(cacheName, debug); + } + + /** {@inheritDoc} */ + @Override protected Collection<UUID> run(String cacheName) { + Collection<ClusterNode> nodes = ignite.cluster().forDataNodes(cacheName).nodes(); + + Collection<UUID> res = new ArrayList<>(nodes.size()); + + for (ClusterNode node : nodes) + res.add(node.id()); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorCacheNodesJob.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java index 2466868..becebda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java @@ -17,10 +17,7 @@ package org.apache.ignite.internal.visor.query; -import org.jetbrains.annotations.*; - import java.io.*; -import java.util.*; /** * Arguments for {@link VisorQueryTask}. @@ -29,9 +26,6 @@ public class VisorQueryArg implements Serializable { /** */ private static final long serialVersionUID = 0L; - /** Optional node ID. */ - private final UUID nid; - /** Cache name for query. */ private final String cacheName; @@ -42,26 +36,17 @@ public class VisorQueryArg implements Serializable { private final int pageSize; /** - * @param nid Optional node ID with cache. * @param cacheName Cache name for query. * @param qryTxt Query text. * @param pageSize Result batch size. */ - public VisorQueryArg(@Nullable UUID nid, String cacheName, String qryTxt, int pageSize) { - this.nid = nid; + public VisorQueryArg(String cacheName, String qryTxt, int pageSize) { this.cacheName = cacheName; this.qryTxt = qryTxt; this.pageSize = pageSize; } /** - * @return Optional node ID. - */ - @Nullable public UUID nodeId() { - return nid; - } - - /** * @return Cache name. */ public String cacheName() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java index 322b3b4..a587b89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java @@ -88,7 +88,6 @@ public class VisorQueryCursor<T> implements Iterator<T>, AutoCloseable { this.accessed = accessed; } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(VisorQueryCursor.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java index f5a2746..ebf62fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.visor.query; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; -import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.timeout.*; @@ -33,7 +32,6 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.internal.visor.query.VisorQueryUtils.*; -import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*; /** * Job for execute SCAN or SQL query and get first page of results. @@ -62,41 +60,11 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten return cacheProcessor.jcache(cacheName); } - /** - * @return Query task class name. - */ - protected Class<? extends VisorQueryTask> task() { - return VisorQueryTask.class; - } - /** {@inheritDoc} */ @Override protected IgniteBiTuple<? extends Exception, VisorQueryResultEx> run(VisorQueryArg arg) { try { - String cacheName = arg.cacheName(); - UUID nid = ignite.localNode().id(); - // If node was not specified then we need to check if this node could be used for query - // or we need to send task to appropriate node. - if (arg.nodeId() == null) { - ClusterGroup prj = ignite.cluster().forDataNodes(cacheName); - - if (prj.node() == null) - throw new IgniteException("No data nodes for cache: " + escapeName(cacheName)); - - // Current node does not fit. - if (prj.node(nid) == null) { - Collection<ClusterNode> prjNodes = prj.nodes(); - - Collection<UUID> nids = new ArrayList<>(prjNodes.size()); - - for (ClusterNode node : prjNodes) - nids.add(node.id()); - - return ignite.compute(prj).withNoFailover().execute(task(), new VisorTaskArgument<>(nids, arg, false)); - } - } - boolean scan = arg.queryTxt().toUpperCase().startsWith("SCAN"); String qryId = (scan ? SCAN_QRY_NAME : SQL_QRY_NAME) + "-" + @@ -126,8 +94,8 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten else cur.close(); - return new IgniteBiTuple<>(null, new VisorQueryResultEx(ignite.localNode().id(), qryId, - SCAN_COL_NAMES, rows, hasNext, duration)); + return new IgniteBiTuple<>(null, new VisorQueryResultEx(nid, qryId, SCAN_COL_NAMES, rows, hasNext, + duration)); } else { SqlFieldsQuery qry = new SqlFieldsQuery(arg.queryTxt()); @@ -163,8 +131,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten else cur.close(); - return new IgniteBiTuple<>(null, new VisorQueryResultEx(ignite.localNode().id(), qryId, - names, rows, hasNext, duration)); + return new IgniteBiTuple<>(null, new VisorQueryResultEx(nid, qryId, names, rows, hasNext, duration)); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java index 2ce011e..4f2fda5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java @@ -17,18 +17,10 @@ package org.apache.ignite.internal.visor.query; -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.visor.*; import org.apache.ignite.lang.*; -import java.util.*; - -import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*; - /** * Task for execute SCAN or SQL query and get first page of results. */ @@ -38,42 +30,6 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg, IgniteBiTupl private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override protected Map<? extends ComputeJob, ClusterNode> map0(List<ClusterNode> subgrid, - VisorTaskArgument<VisorQueryArg> arg) { - String cache = taskArg.cacheName(); - - ClusterNode node; - - UUID nid = taskArg.nodeId(); - - IgniteClusterEx cluster = ignite.cluster(); - - if (nid == null) { - ClusterGroup prj = cluster.localNode().isDaemon() ? cluster.forRemotes() : cluster.forDataNodes(cache); - - if (prj.nodes().isEmpty()) - throw new IgniteException("No data nodes for cache: " + escapeName(cache)); - - // First try to take local node to avoid network hop. - node = prj.node(ignite.localNode().id()); - - // Take any node from projection. - if (node == null) - node = prj.forRandom().node(); - } - else { - node = cluster.node(nid); - - if (node == null) - throw new IgniteException("Node not found: " + nid); - } - - assert node != null; - - return Collections.singletonMap(job(taskArg), node); - } - - /** {@inheritDoc} */ @Override protected VisorQueryJob job(VisorQueryArg arg) { return new VisorQueryJob(arg, debug); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala index 6faa276..a8e2802 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala @@ -22,8 +22,6 @@ import org.apache.ignite.visor.VisorTag import org.apache.ignite.visor.commands.VisorConsoleCommand import org.apache.ignite.visor.visor._ -import java.util.{HashSet => JavaHashSet} - import org.apache.ignite.internal.visor.misc.VisorAckTask import scala.language.implicitConversions http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala index 1d5a1f6..5603f8f 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala @@ -26,9 +26,7 @@ import org.apache.ignite.visor.commands.VisorTextTable import org.apache.ignite.visor.visor import visor._ -import scala.collection.JavaConversions._ import scala.language.reflectiveCalls -import scala.util.control.Breaks._ /** * ==Overview== @@ -85,7 +83,7 @@ class VisorCacheClearCommand { * * @param argLst Command arguments. */ - def clear(argLst: ArgList, node: Option[ClusterNode]) = breakable { + def clear(argLst: ArgList, node: Option[ClusterNode]) { val cacheArg = argValue("c", argLst) val cacheName = cacheArg match { @@ -95,32 +93,35 @@ class VisorCacheClearCommand { warn("Can't find cache variable with specified name: " + s, "Type 'cache' to see available cache variables.") - break() + return case Some(name) => name } - val n = projectionForNode(node).node() + val grp = groupForDataNode(node, cacheName) - if (n == null) - scold(messageNodeNotFound(node)).^^ + if (grp.nodes().isEmpty) + scold("Can't find nodes with specified cache: " + escapeName(cacheName)) + else { + try { + val t = VisorTextTable() - val t = VisorTextTable() + t #= ("Node ID8(@)", "Cache Size Before", "Cache Size After") - t #= ("Node ID8(@)", "Cache Size Before", "Cache Size After") + val nid = grp.forRandom().node().id() - try { - val res = executeOne(n.id(), classOf[VisorCacheClearTask], cacheName) + val res = executeOne(nid, classOf[VisorCacheClearTask], cacheName) - t += (nodeId8(n.id()), res.get1(), res.get2()) - } - catch { - case e: Throwable => scold(e.getMessage).^^ - } + t += (nodeId8(nid), res.get1(), res.get2()) - println("Cleared cache with name: " + escapeName(cacheName)) + println("Cleared cache with name: " + escapeName(cacheName)) - t.render() + t.render() + } + catch { + case e: Throwable => scold(e.getMessage) + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala index e74cb2c..d5d89f0 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala @@ -495,7 +495,7 @@ class VisorCacheCommand { assert(node != null) try { - val prj = projectionForNode(node) + val prj = groupForNode(node) val nids = prj.nodes().map(_.id()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala index e9c6f6c..f2b1924 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala @@ -136,17 +136,19 @@ class VisorCacheScanCommand { case Some(name) => name } - val n = projectionForNode(node).node() + val grp = groupForDataNode(node, cacheName) - if (n == null) { - scold(messageNodeNotFound(node)).^^ + if (grp.nodes().isEmpty) { + scold(messageNodeNotFound(node)) return } - val fullRes = + val nid = grp.forRandom().node().id() + + val firstPage = try - executeOne(n.id(), classOf[VisorQueryTask], new VisorQueryArg(null, cacheName, "SCAN", pageSize)) + executeOne(nid, classOf[VisorQueryTask], new VisorQueryArg(cacheName, "SCAN", pageSize)) match { case x if x.get1() != null => error(x.get1()) @@ -161,14 +163,14 @@ class VisorCacheScanCommand { return } - var res: VisorQueryResult = fullRes - - if (res.rows.isEmpty) { + if (firstPage.rows.isEmpty) { println("Cache: " + escapeName(cacheName) + " is empty") return } + var nextPage: VisorQueryResult = firstPage + def render() { println("Entries in cache: " + escapeName(cacheName)) @@ -176,19 +178,19 @@ class VisorCacheScanCommand { t #= ("Key Class", "Key", "Value Class", "Value") - res.rows.foreach(r => t += (r(0), r(1), r(2), r(3))) + nextPage.rows.foreach(r => t += (r(0), r(1), r(2), r(3))) t.render() } render() - while (res.hasMore) { + while (nextPage.hasMore) { ask("\nFetch more objects (y/n) [y]:", "y") match { case "y" | "Y" => try { - res = executeOne(fullRes.responseNodeId(), classOf[VisorQueryNextPageTask], - new IgniteBiTuple[String, Integer](fullRes.queryId(), pageSize)) + nextPage = executeOne(firstPage.responseNodeId(), classOf[VisorQueryNextPageTask], + new IgniteBiTuple[String, Integer](firstPage.queryId(), pageSize)) render() } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala index 690ee13..56c2e79 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheStopCommand.scala @@ -92,16 +92,24 @@ class VisorCacheStopCommand { case Some(name) => name } + val grp = groupForDataNode(node, cacheName) + + if (grp.nodes().isEmpty) { + warn("Can't find nodes with specified cache: " + escapeName(cacheName), + "Type 'cache' to see available cache names.") + + return + } + ask(s"Are you sure you want to stop cache: ${escapeName(cacheName)}? (y/n) [n]: ", "n") match { case "y" | "Y" => try { - executeRandom(classOf[VisorCacheStopTask], cacheName) + executeRandom(grp, classOf[VisorCacheStopTask], cacheName) println("Visor successfully stop cache: " + escapeName(cacheName)) } catch { - case e: Exception => - error(e) + case e: Exception => error(e) } case "n" | "N" => http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala index d48e41d..c475f90 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala @@ -100,18 +100,22 @@ class VisorCacheSwapCommand { case Some(name) => name } - val n = projectionForNode(node).node() + val grp = groupForDataNode(node, cacheName) - if (n != null) { + if (grp.nodes().isEmpty) + scold(messageNodeNotFound(node, Some("Can't find nodes for cache: " + escapeName(cacheName)))) + else try { - val r = executeOne(n.id(), classOf[VisorCacheSwapBackupsTask], Collections.singleton(cacheName)).get(cacheName) + val nid = grp.forRandom().node().id() + + val r = executeOne(nid, classOf[VisorCacheSwapBackupsTask], Collections.singleton(cacheName)).get(cacheName) if (r != null) { val t = VisorTextTable() t #= ("Node ID8(@)", "Entries Swapped", "Cache Size Before", "Cache Size After") - t += (nodeId8(n.id()), r.get1() - r.get2(), r.get1(), r.get2()) + t += (nodeId8(nid), r.get1() - r.get2(), r.get1(), r.get2()) println("Swapped entries in cache: " + escapeName(cacheName)) @@ -123,9 +127,6 @@ class VisorCacheSwapCommand { catch { case e: Exception => scold(e.getMessage) } - } - else - scold(messageNodeNotFound(node, Some("Can't find nodes for cache: " + escapeName(cacheName)))) } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala index 3dd3784..e58518b 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/disco/VisorDiscoveryCommand.scala @@ -21,8 +21,6 @@ import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.events.EventType._ import org.apache.ignite.internal.util.lang.{GridFunc => F} import org.apache.ignite.internal.util.{IgniteUtils => U} -import java.util.UUID - import org.apache.ignite.internal.visor.event.VisorGridDiscoveryEvent import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask.VisorNodeEventsCollectorTaskArg http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a91429b9/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala index 7daaa71..f5c4ce5 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala @@ -260,14 +260,24 @@ object visor extends VisorTag { /** * @param node Optional node. - * @return Projection with specified node or projection with random node if specified node is `None`. + * @return Cluster group with specified node or projection with random node if specified node is `None`. */ - def projectionForNode(node: Option[ClusterNode]): ClusterGroup = node match { + def groupForNode(node: Option[ClusterNode]): ClusterGroup = node match { case Some(n) => ignite.cluster.forNode(n) case None => ignite.cluster.forRandom() } /** + * @param node Optional node. + * @param cacheName Cache name to take cluster group for. + * @return Cluster group with data nodes for specified cache or cluster group for specified node. + */ + def groupForDataNode(node: Option[ClusterNode], cacheName: String): ClusterGroup = node match { + case Some(n) => ignite.cluster.forNode(n) + case None => ignite.cluster.forNodeIds(executeRandom(classOf[VisorCacheNodesTask], cacheName)) + } + + /** * @param node Node. * @param msg Optional message. * @return Message about why node was not found. @@ -1863,7 +1873,7 @@ object visor extends VisorTag { def toTaskArgument[A](nids: Iterable[UUID], arg: A): VisorTaskArgument[A] = new VisorTaskArgument(new JavaHashSet(nids), arg, false) - private def execute[A, R, J](grp: ClusterGroup, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = { + private def execute[A, R, J](grp: ClusterGroup, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A): R = { if (grp.nodes().isEmpty) throw new ClusterGroupEmptyException("Topology is empty.") @@ -1881,10 +1891,24 @@ object visor extends VisorTag { * @tparam J Job class. * @return Task result. */ - def executeOne[A, R, J](nid: UUID, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + def executeOne[A, R, J](nid: UUID, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A): R = execute(ignite.cluster.forNodeId(nid), task, arg) /** + * Execute task on random node from specified cluster group. + * + * @param grp Cluster group to take rundom node from + * @param task Task class + * @param arg Task argument. + * @tparam A Task argument type. + * @tparam R Task result type + * @tparam J Job class. + * @return Task result. + */ + def executeRandom[A, R, J](grp: ClusterGroup, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A): R = + execute(grp.forRandom(), task, arg) + + /** * Execute task on random node. * * @param task Task class @@ -1894,7 +1918,7 @@ object visor extends VisorTag { * @tparam J Job class. * @return Task result. */ - def executeRandom[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + def executeRandom[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A): R = execute(ignite.cluster.forRandom(), task, arg) /** @@ -1908,7 +1932,7 @@ object visor extends VisorTag { * @tparam J Job class. * @return Task result. */ - def executeMulti[A, R, J](nids: Iterable[UUID], task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + def executeMulti[A, R, J](nids: Iterable[UUID], task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A): R = execute(ignite.cluster.forNodeIds(nids), task, arg) /** @@ -1921,7 +1945,7 @@ object visor extends VisorTag { * @tparam J Job class. * @return Task result. */ - def executeMulti[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + def executeMulti[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A): R = execute(ignite.cluster.forRemotes(), task, arg) /**