Repository: incubator-ignite Updated Branches: refs/heads/ignite-834 9be603c17 -> 79e07233d
# ignite-834 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/79e07233 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/79e07233 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/79e07233 Branch: refs/heads/ignite-834 Commit: 79e07233deead23f420ad4ced2ef1d715a8a1ca5 Parents: 9be603c Author: sboikov <sboi...@gridgain.com> Authored: Tue May 5 17:10:15 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue May 5 17:10:15 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 112 ++++++++++--------- .../resources/META-INF/classnames.properties | 6 +- 2 files changed, 65 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79e07233/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index efa5fb5..b493812 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1115,34 +1115,38 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param timeout Timeout for clearLocally all task in milliseconds (0 for never). * Set it to larger value for large caches. - * @param keys Keys to clear. + * @param keys Keys to clear or {@code null} if all cache should be cleared. * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. */ - private void clearRemotes(long timeout, final Set<? extends K> keys) throws IgniteCheckedException { + private void clearRemotes(long timeout, @Nullable final Set<? extends K> keys) throws IgniteCheckedException { // Send job to remote nodes only. - ClusterGroup nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes(); + Collection<ClusterNode> nodes = + ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes(); - if (!nodes.nodes().isEmpty()) { + if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); - ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes.nodes()); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); ctx.kernalContext().task().execute(new ComputeTaskAdapter<Object, Object>() { - /** {@inheritDoc} */ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) throws IgniteException { Map<ComputeJob, ClusterNode> jobs = new HashMap(); - for (ClusterNode node : subgrid) + for (ClusterNode node : subgrid) { jobs.put(keys == null ? - new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()) : - new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), keys), + new GlobalClearAllJob(name(), ctx.affinity().affinityTopologyVersion()) : + new GlobalClearKeySetJob<K>(name(), ctx.affinity().affinityTopologyVersion(), keys), node); + } return jobs; } - /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException { return null; } @@ -1156,7 +1160,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * @param keys Keys to clear. + * @param keys Keys to clear or {@code null} if all cache should be cleared. * @return Future. */ private IgniteInternalFuture<?> clearKeysAsync(final Set<? extends K> keys) { @@ -1166,20 +1170,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); return ctx.kernalContext().task().execute(new ComputeTaskAdapter<Object, Object>() { - /** {@inheritDoc} */ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable Object arg) throws IgniteException { + @Nullable Object arg) throws IgniteException { Map<ComputeJob, ClusterNode> jobs = new HashMap(); - for (ClusterNode node : subgrid) + + for (ClusterNode node : subgrid) { jobs.put(keys == null ? - new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion()) : - new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), keys), + new GlobalClearAllJob(name(), ctx.affinity().affinityTopologyVersion()) : + new GlobalClearKeySetJob<K>(name(), ctx.affinity().affinityTopologyVersion(), keys), node); + } return jobs; } - /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException { return null; } @@ -3587,18 +3595,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); return ctx.kernalContext().task().execute(new ComputeTaskAdapter<Object, Integer>() { - - /** {@inheritDoc} */ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) throws IgniteException { Map<ComputeJob, ClusterNode> jobs = new HashMap(); + for (ClusterNode node : subgrid) - jobs.put(new SizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node); + jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node); return jobs; } - /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + @Nullable @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException { int size = 0; @@ -4870,14 +4880,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Global clear all. */ @GridInternal - private static class GlobalClearAllCallable extends TopologyVersionAwareJob { + private static class GlobalClearAllJob extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; /** * Empty constructor for serialization. */ - public GlobalClearAllCallable() { + public GlobalClearAllJob() { // No-op. } @@ -4885,13 +4895,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param cacheName Cache name. * @param topVer Affinity topology version. */ - private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) { + private GlobalClearAllJob(String cacheName, AffinityTopologyVersion topVer) { super(cacheName, topVer); } /** {@inheritDoc} */ - @Nullable @Override public Object localExecute() { - ((IgniteEx)ignite).cachex(cacheName).clearLocally(); + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + if (cache != null) + cache.clearLocally(); return null; } @@ -4901,7 +4912,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Global clear keys. */ @GridInternal - private static class GlobalClearKeySetCallable<K, V> extends TopologyVersionAwareJob { + private static class GlobalClearKeySetJob<K> extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -4911,7 +4922,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Empty constructor for serialization. */ - public GlobalClearKeySetCallable() { + public GlobalClearKeySetJob() { // No-op. } @@ -4920,15 +4931,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param topVer Affinity topology version. * @param keys Keys to clear. */ - private GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { + private GlobalClearKeySetJob(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { super(cacheName, topVer); this.keys = keys; } /** {@inheritDoc} */ - @Nullable @Override public Object localExecute() { - ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + if (cache != null) + cache.clearLocallyAll(keys); return null; } @@ -4938,7 +4950,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Internal callable for global size calculation. */ @GridInternal - private static class SizeCallable extends TopologyVersionAwareJob { + private static class SizeJob extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -4948,7 +4960,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Required by {@link Externalizable}. */ - public SizeCallable() { + public SizeJob() { // No-op. } @@ -4957,17 +4969,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param topVer Affinity topology version. * @param peekModes Cache peek modes. */ - private SizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { + private SizeJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { super(cacheName, topVer); this.peekModes = peekModes; } /** {@inheritDoc} */ - @Nullable @Override public Object localExecute() { - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - - assert cache != null : cacheName; + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + if (cache == null) + return 0; try { return cache.localSize(peekModes); @@ -4979,7 +4990,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ public String toString() { - return S.toString(SizeCallable.class, this); + return S.toString(SizeJob.class, this); } } @@ -5548,11 +5559,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Delayed callable class. */ - protected static abstract class TopologyVersionAwareJob<K, V> extends ComputeJobAdapter { + protected static abstract class TopologyVersionAwareJob extends ComputeJobAdapter { /** */ private static final long serialVersionUID = 0L; - /** Auto-inject job context. */ + /** Injected job context. */ @JobContextResource protected ComputeJobContext jobCtx; @@ -5578,6 +5589,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param topVer Affinity topology version. */ public TopologyVersionAwareJob(String cacheName, AffinityTopologyVersion topVer) { + assert topVer != null; + this.cacheName = cacheName; this.topVer = topVer; } @@ -5586,25 +5599,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Nullable @Override public final Object execute() { waitAffinityReadyFuture(); - return localExecute(); + IgniteInternalCache cache = ((IgniteKernal)ignite).context().cache().cache(cacheName); + + return localExecute(cache); } /** + * @param cache Cache. * @return Local execution result. */ - @Nullable protected abstract Object localExecute(); + @Nullable protected abstract Object localExecute(@Nullable IgniteInternalCache cache); /** - * Hold (suspend) job execution until our cache version becomes equal to remote cache's version. + * Holds (suspends) job execution until our cache version becomes equal to remote cache's version. */ private void waitAffinityReadyFuture() { - GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache(); - - GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName); - - final GridCacheContext<K, V> ctx = cacheAdapter.context(); + GridCacheProcessor cacheProc = ((IgniteKernal)ignite).context().cache(); - AffinityTopologyVersion locTopVer = ctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion locTopVer = cacheProc.context().exchange().readyAffinityVersion(); if (locTopVer.compareTo(topVer) < 0) { IgniteInternalFuture<?> fut = cacheProc.context().exchange().affinityReadyFuture(topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79e07233/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 35495ed..ff263cd 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -323,13 +323,13 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$72 org.apache.ignite.internal.processors.cache.GridCacheAdapter$73 org.apache.ignite.internal.processors.cache.GridCacheAdapter$74 org.apache.ignite.internal.processors.cache.GridCacheAdapter$9 -org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllCallable +org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllJob org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearCallable -org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetCallable +org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetJob org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalSizeCallable org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadCacheClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadKeysCallable -org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeCallable +org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeJob org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure