#ignite-834: Change size and clear to jobs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/863d6bf6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/863d6bf6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/863d6bf6 Branch: refs/heads/ignite-834 Commit: 863d6bf69f75c2bf1a7571ffa76259544baf0539 Parents: 086553e Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue May 5 15:23:41 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue May 5 15:23:41 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 331 +++++-------------- .../GridDistributedCacheAdapter.java | 28 +- 2 files changed, 103 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/863d6bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d59e9cc..596705e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -25,7 +25,6 @@ import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.*; @@ -1123,32 +1122,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param clearCall Global clear callable object. * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. */ - private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException { - try { - // Send job to remote nodes only. - Collection<ClusterNode> nodes = - ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes(); + private void clearRemotes(long timeout, final TopologyVersionAwareJob clearCall) throws IgniteCheckedException { + // Send job to remote nodes only. + ClusterGroup nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes(); - IgniteInternalFuture<Object> fut = null; + if (!nodes.nodes().isEmpty()) { + ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); - if (!nodes.isEmpty()) { - ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); + ctx.grid().context().task().setThreadContext(TC_SUBGRID, nodes); - fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); - } + ctx.grid().context().task().execute(new ComputeTaskAdapter<Object, Object>() { + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + for (ClusterNode node : subgrid) + jobs.put(clearCall, node); - if (fut != null) - fut.get(); - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]"); - } - catch (ComputeTaskTimeoutCheckedException e) { - U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + - "'networkTimeout' configuration property) [cacheName=" + name() + "]"); + return jobs; + } - throw e; + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException { + return null; + } + }, null).get(); } } @@ -1161,26 +1159,28 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param clearCall Global clear callable object. * @return Future. */ - private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) { + private IgniteInternalFuture<?> clearAsync(final TopologyVersionAwareJob clearCall) { Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes(); if (!nodes.isEmpty()) { - IgniteInternalFuture<Object> fut = - ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); + ctx.grid().context().task().setThreadContext(TC_SUBGRID, nodes); - return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() { - @Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException { - try { - return fut.get(); - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]"); + return ctx.grid().context().task().execute(new ComputeTaskAdapter<Object, Object>() { + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + for (ClusterNode node : subgrid) + jobs.put(clearCall, node); - return null; - } + return jobs; } - }); + + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException { + return null; + } + }, null); } else return new GridFinishedFuture<>(); @@ -3567,7 +3567,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] peekModes) { + @Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[] peekModes) { assert peekModes != null; PeekModes modes = parsePeekModes(peekModes, true); @@ -3581,23 +3581,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (nodes.isEmpty()) return new GridFinishedFuture<>(0); - IgniteInternalFuture<Collection<Integer>> fut = - ctx.closures().broadcastNoFailover( - new SizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null, nodes); + ctx.grid().context().task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.grid().context().task().execute(new ComputeTaskAdapter<Object, Integer>() { + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + for (ClusterNode node : subgrid) + jobs.put(new SizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node); - return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() { - @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut) - throws IgniteCheckedException { - Collection<Integer> res = fut.get(); + return jobs; + } - int totalSize = 0; + /** {@inheritDoc} */ + @Nullable @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException { + int size = 0; - for (Integer size : res) - totalSize += size; + for (ComputeJobResult res : results) { + if (res.getException() == null) + size += res.<Integer>getData(); + } - return totalSize; + return size; } - }); + }, null); } /** {@inheritDoc} */ @@ -3915,51 +3924,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * Gets cache global size (with or without backups). - * - * @param primaryOnly {@code True} if only primary sizes should be included. - * @return Global size. - * @throws IgniteCheckedException If internal task execution failed. - */ - private int globalSize(boolean primaryOnly) throws IgniteCheckedException { - try { - // Send job to remote nodes only. - Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes(); - - IgniteInternalFuture<Collection<Integer>> fut = null; - - if (!nodes.isEmpty()) { - ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout()); - - fut = ctx.closures().broadcastNoFailover( - new GlobalSizeCallable(name(), ctx.affinity().affinityTopologyVersion(), primaryOnly), null, nodes); - } - - // Get local value. - int globalSize = primaryOnly ? primarySize() : size(); - - if (fut != null) { - for (Integer i : fut.get()) - globalSize += i; - } - - return globalSize; - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]"); - - return primaryOnly ? primarySize() : size(); - } - catch (ComputeTaskTimeoutCheckedException e) { - U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + - "'networkTimeout' configuration property) [cacheName=" + name() + "]"); - - throw e; - } - } - - /** * @param op Cache operation. * @param <T> Return type. * @return Operation result. @@ -4900,41 +4864,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * Internal callable which performs clear operation on a cache with the given name. - */ - @GridInternal - private static abstract class GlobalClearCallable extends DelayedCallable implements Callable<Object> { - /** - * Empty constructor for serialization. - */ - public GlobalClearCallable() { - // No-op. - } - - /** - * @param cacheName Cache name. - * @param topVer Affinity topology version. - */ - protected GlobalClearCallable(String cacheName, AffinityTopologyVersion topVer) { - super(cacheName, topVer); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - } - } - - /** * Global clear all. */ @GridInternal - private static class GlobalClearAllCallable extends GlobalClearCallable { + private static class GlobalClearAllCallable extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -4954,9 +4887,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public Object call() throws Exception { - waitAffinityReadyFuture(); - + @Nullable @Override public Object localExecute() { ((IgniteEx)ignite).cachex(cacheName).clearLocally(); return null; @@ -4967,7 +4898,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Global clear keys. */ @GridInternal - private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable { + private static class GlobalClearKeySetCallable<K, V> extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -4993,34 +4924,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public Object call() throws Exception { - waitAffinityReadyFuture(); - + @Nullable @Override public Object localExecute() { ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); return null; } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeObject(keys); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - keys = (Set<K>) in.readObject(); - } } /** * Internal callable for global size calculation. */ @GridInternal - private static class SizeCallable extends DelayedCallable implements IgniteClosure<Object, Integer> { + private static class SizeCallable extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -5046,9 +4961,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public Integer apply(Object o) { - waitAffinityReadyFuture(); - + @Nullable @Override public Object localExecute() { IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); assert cache != null : cacheName; @@ -5062,89 +4975,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeInt(peekModes.length); - - for (int i = 0; i < peekModes.length; i++) - U.writeEnum(out, peekModes[i]); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - int len = in.readInt(); - - peekModes = new CachePeekMode[len]; - - for (int i = 0; i < len; i++) - peekModes[i] = CachePeekMode.fromOrdinal(in.readByte()); - } - - /** {@inheritDoc} */ public String toString() { return S.toString(SizeCallable.class, this); } } /** - * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()} - * operation on a cache with the given name. - */ - @GridInternal - private static class GlobalSizeCallable extends DelayedCallable implements IgniteClosure<Object, Integer>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Primary only flag. */ - private boolean primaryOnly; - - /** - * Empty constructor for serialization. - */ - public GlobalSizeCallable() { - // No-op. - } - - /** - * @param cacheName Cache name. - * @param topVer Affinity topology version. - * @param primaryOnly Primary only flag. - */ - private GlobalSizeCallable(String cacheName, AffinityTopologyVersion topVer, boolean primaryOnly) { - super(cacheName, topVer); - - this.primaryOnly = primaryOnly; - } - - /** {@inheritDoc} */ - @Override public Integer apply(Object o) { - waitAffinityReadyFuture(); - - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - - return primaryOnly ? cache.primarySize() : cache.size(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeBoolean(primaryOnly); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - primaryOnly = in.readBoolean(); - } - } - - /** * Holder for last async operation future. */ protected static class FutureHolder { @@ -5709,7 +5545,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Delayed callable class. */ - protected static abstract class DelayedCallable<K, V> implements Externalizable { + protected static abstract class TopologyVersionAwareJob<K, V> extends ComputeJobAdapter { /** */ private static final long serialVersionUID = 0L; @@ -5730,22 +5566,37 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Empty constructor for serialization. */ - public DelayedCallable() { + public TopologyVersionAwareJob() { // No-op. } /** + * @param cacheName Cache name. * @param topVer Affinity topology version. */ - public DelayedCallable(String cacheName, AffinityTopologyVersion topVer) { + public TopologyVersionAwareJob(String cacheName, AffinityTopologyVersion topVer) { this.cacheName = cacheName; this.topVer = topVer; } + /** {@inheritDoc} */ + @Nullable @Override public final Object execute() { + waitAffinityReadyFuture(); + + localExecute(); + + return null; + } + + /** + * @return Local execution result. + */ + @Nullable protected abstract Object localExecute(); + /** * Hold (suspend) job execution until our cache version becomes equal to remote cache's version. */ - public void waitAffinityReadyFuture() { + private void waitAffinityReadyFuture() { GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache(); GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName); @@ -5768,21 +5619,5 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - - topVer.writeExternal(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - - topVer = new AffinityTopologyVersion(); - - topVer.readExternal(in); - } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/863d6bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 661df87..3a685cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; @@ -230,13 +231,23 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * operation on a cache with the given name. */ @GridInternal - private static class GlobalRemoveAllCallable<K,V> extends DelayedCallable<K, V> implements Callable<Object> { + private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { /** */ private static final long serialVersionUID = 0L; + /** Cache name. */ + private String cacheName; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + /** Skip store flag. */ private boolean skipStore; + /** Injected grid instance. */ + @IgniteInstanceResource + private Ignite ignite; + /** * Empty constructor for serialization. */ @@ -250,7 +261,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param skipStore Skip store flag. */ private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { - super(cacheName, topVer); + this.cacheName = cacheName; + this.topVer = topVer; this.skipStore = skipStore; } @@ -258,11 +270,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * {@inheritDoc} */ @Override public Object call() throws Exception { - GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal) ignite).context().cache().internalCache(cacheName); + GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); final GridCacheContext<K, V> ctx = cacheAdapter.context(); - waitAffinityReadyFuture(); + ctx.affinity().affinityReadyFuture(topVer).get(); ctx.gate().enter(); @@ -326,15 +338,15 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - + U.writeString(out, cacheName); + out.writeObject(topVer); out.writeBoolean(skipStore); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - + cacheName = U.readString(in); + topVer = (AffinityTopologyVersion)in.readObject(); skipStore = in.readBoolean(); } }