Repository: incubator-ignite Updated Branches: refs/heads/ignite-834 a460b2e79 -> 9be603c17
#ignite-834: Small change Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9be603c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9be603c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9be603c1 Branch: refs/heads/ignite-834 Commit: 9be603c1735fb789bf2b2d89b862a1ad0d14ffd2 Parents: a460b2e Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue May 5 16:07:53 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue May 5 16:07:53 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 37 +++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9be603c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index b29cabd..efa5fb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1083,7 +1083,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(); - clearRemotes(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); + clearRemotes(0, null); } /** {@inheritDoc} */ @@ -1091,8 +1091,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(key); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), - Collections.singleton(key))); + clearRemotes(0, Collections.singleton(key)); } /** {@inheritDoc} */ @@ -1100,44 +1099,45 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocallyAll(keys); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), - keys)); + clearRemotes(0, keys); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(K key) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), - Collections.singleton(key))); + return clearKeysAsync(Collections.singleton(key)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), - keys)); + return clearKeysAsync(keys); } /** * @param timeout Timeout for clearLocally all task in milliseconds (0 for never). * Set it to larger value for large caches. - * @param clearCall Global clear callable object. + * @param keys Keys to clear. * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. */ - private void clearRemotes(long timeout, final TopologyVersionAwareJob clearCall) throws IgniteCheckedException { + private void clearRemotes(long timeout, final Set<? extends K> keys) throws IgniteCheckedException { // Send job to remote nodes only. ClusterGroup nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes(); if (!nodes.nodes().isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); - ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes.nodes()); ctx.kernalContext().task().execute(new ComputeTaskAdapter<Object, Object>() { /** {@inheritDoc} */ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) throws IgniteException { Map<ComputeJob, ClusterNode> jobs = new HashMap(); + for (ClusterNode node : subgrid) - jobs.put(clearCall, node); + jobs.put(keys == null ? + new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()) : + new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), keys), + node); return jobs; } @@ -1152,14 +1152,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync() { - return clearAsync(new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); + return clearKeysAsync(null); } /** - * @param clearCall Global clear callable object. + * @param keys Keys to clear. * @return Future. */ - private IgniteInternalFuture<?> clearAsync(final TopologyVersionAwareJob clearCall) { + private IgniteInternalFuture<?> clearKeysAsync(final Set<? extends K> keys) { Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes(); if (!nodes.isEmpty()) { @@ -1171,7 +1171,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Nullable Object arg) throws IgniteException { Map<ComputeJob, ClusterNode> jobs = new HashMap(); for (ClusterNode node : subgrid) - jobs.put(clearCall, node); + jobs.put(keys == null ? + new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()) : + new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), keys), + node); return jobs; }