Repository: incubator-ignite Updated Branches: refs/heads/ignite-834 79e07233d -> 54eebf2c8
#ignite-834: move internal classes to static. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/54eebf2c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/54eebf2c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/54eebf2c Branch: refs/heads/ignite-834 Commit: 54eebf2c8750155fab83d71ce8de76e8cee28aab Parents: 79e0723 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue May 5 17:43:22 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue May 5 17:43:22 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 187 ++++++++++++------- 1 file changed, 115 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/54eebf2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index b493812..6674993 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1128,29 +1128,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - ctx.kernalContext().task().execute(new ComputeTaskAdapter<Object, Object>() { - @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable Object arg) throws IgniteException { - Map<ComputeJob, ClusterNode> jobs = new HashMap(); - - for (ClusterNode node : subgrid) { - jobs.put(keys == null ? - new GlobalClearAllJob(name(), ctx.affinity().affinityTopologyVersion()) : - new GlobalClearKeySetJob<K>(name(), ctx.affinity().affinityTopologyVersion(), keys), - node); - } - - return jobs; - } - - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { - return ComputeJobResultPolicy.WAIT; - } - - @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException { - return null; - } - }, null).get(); + ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get(); } } @@ -1169,29 +1147,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - return ctx.kernalContext().task().execute(new ComputeTaskAdapter<Object, Object>() { - @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable Object arg) throws IgniteException { - Map<ComputeJob, ClusterNode> jobs = new HashMap(); - - for (ClusterNode node : subgrid) { - jobs.put(keys == null ? - new GlobalClearAllJob(name(), ctx.affinity().affinityTopologyVersion()) : - new GlobalClearKeySetJob<K>(name(), ctx.affinity().affinityTopologyVersion(), keys), - node); - } - - return jobs; - } - - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { - return ComputeJobResultPolicy.WAIT; - } - - @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException { - return null; - } - }, null); + return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null); } else return new GridFinishedFuture<>(); @@ -3594,32 +3550,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - return ctx.kernalContext().task().execute(new ComputeTaskAdapter<Object, Integer>() { - @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable Object arg) throws IgniteException { - Map<ComputeJob, ClusterNode> jobs = new HashMap(); - - for (ClusterNode node : subgrid) - jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node); - - return jobs; - } - - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { - return ComputeJobResultPolicy.WAIT; - } - - @Nullable @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException { - int size = 0; - - for (ComputeJobResult res : results) { - if (res.getException() == null && res != null) - size += res.<Integer>getData(); - } - - return size; - } - }, null); + return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null); } /** {@inheritDoc} */ @@ -5633,4 +5564,116 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } } + + /** + * Size task. + */ + private static class SizeTask extends ComputeTaskAdapter<Object, Integer> { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache context. */ + private GridCacheContext ctx; + + /** Peek modes. */ + private CachePeekMode[] peekModes; + + /** + * Empty constructor for serialization. + */ + public SizeTask() { + // No-op. + } + + /** + * @param ctx Cache context. + */ + public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) { + this.ctx = ctx; + this.peekModes = peekModes; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + + for (ClusterNode node : subgrid) + jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException { + int size = 0; + + for (ComputeJobResult res : results) { + if (res.getException() == null && res != null) + size += res.<Integer>getData(); + } + + return size; + } + } + + /** + * Clear task. + */ + private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache context. */ + private GridCacheContext ctx; + + /** Keys to clear. */ + private Set<? extends K> keys; + + /** + * Empty constructor for serialization. + */ + public ClearTask() { + // No-op. + } + + /** + * @param ctx Cache context. + * @param keys Keys to clear. + */ + public ClearTask(GridCacheContext ctx, Set<? extends K> keys) { + this.ctx = ctx; + this.keys = keys; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + + for (ClusterNode node : subgrid) { + jobs.put(keys == null ? + new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion()) : + new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), + node); + } + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException { + return null; + } + } }