Repository: incubator-ignite Updated Branches: refs/heads/ignite-373 ceb283542 -> e50632cad
#ignite-373: Use tasks instead of callable in removeAll. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e50632ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e50632ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e50632ca Branch: refs/heads/ignite-373 Commit: e50632cad41ddd076d1e5bf65ec87d1aa3d06485 Parents: ceb2835 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue May 12 16:29:39 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue May 12 16:29:39 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 19 ++- .../GridDistributedCacheAdapter.java | 153 +++++++++++-------- 2 files changed, 99 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50632ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3826bfa..0b7fa91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4842,7 +4842,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + @Nullable @Override public Object localExecute() { + IgniteInternalCache cache = ((IgniteKernal)ignite).context().cache().cache(cacheName); + if (cache != null) cache.clearLocally(); @@ -4880,7 +4882,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + @Nullable @Override public Object localExecute() { + IgniteInternalCache cache = ((IgniteKernal)ignite).context().cache().cache(cacheName); + if (cache != null) cache.clearLocallyAll(keys); @@ -4918,7 +4922,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + @Nullable @Override public Object localExecute() { + IgniteInternalCache cache = ((IgniteKernal)ignite).context().cache().cache(cacheName); + if (cache == null) return 0; @@ -5541,16 +5547,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Nullable @Override public final Object execute() { waitAffinityReadyFuture(); - IgniteInternalCache cache = ((IgniteKernal)ignite).context().cache().cache(cacheName); - - return localExecute(cache); + return localExecute(); } /** - * @param cache Cache. * @return Local execution result. */ - @Nullable protected abstract Object localExecute(@Nullable IgniteInternalCache cache); + @Nullable protected abstract Object localExecute(); /** * Holds (suspends) job execution until our cache version becomes equal to remote cache's version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50632ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index d10ab56..cafa8b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; @@ -33,16 +34,14 @@ import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import java.io.*; import java.util.*; -import java.util.concurrent.*; -import static org.apache.ignite.internal.GridClosureCallMode.*; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*; /** * Distributed cache implementation. @@ -144,10 +143,10 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try { AffinityTopologyVersion topVer; - boolean removedAll; + Boolean rmvAll; do { - removedAll = true; + rmvAll = true; topVer = ctx.affinity().affinityTopologyVersion(); @@ -155,19 +154,12 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { - CacheOperationContext opCtx = ctx.operationContextPerCall(); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - Collection<Boolean> results = ctx.closures().callAsyncNoFailover(BROADCAST, - Collections.singleton(new GlobalRemoveAllCallable<>(name(), topVer, - opCtx != null && opCtx.skipStore())), nodes, true).get(); - - for (Boolean res : results) { - if (res != null && !res) - removedAll = false; - } + rmvAll = ctx.kernalContext().task().execute(new RemoveAllTask(ctx), null).get(); } } - while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removedAll); + while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || rmvAll == null || !rmvAll); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -194,19 +186,18 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { - CacheOperationContext opCtx = ctx.operationContextPerCall(); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, true); + IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute(new RemoveAllTask(ctx), null); - rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { + rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { try { - fut.get(); + Boolean res = fut.get(); AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion(); - if (topVer0.equals(topVer)) + if (topVer0.equals(topVer) && res != null && res) opFut.onDone(); else removeAllAsync(opFut, topVer0); @@ -238,31 +229,78 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } /** - * Internal callable which performs remove all primary key mappings - * operation on a cache with the given name. + * Remove task. */ @GridInternal - private static class GlobalRemoveAllCallable<K,V> implements Callable<Boolean>, Externalizable { + private static class RemoveAllTask extends ComputeTaskAdapter<Object, Boolean> { /** */ private static final long serialVersionUID = 0L; - /** Cache name. */ - private String cacheName; + /** Cache context. */ + private GridCacheContext ctx; + + /** + * Empty constructor for serialization. + */ + public RemoveAllTask(){ + // No-op. + } + + /** + * @param ctx Cache context. + */ + public RemoveAllTask(GridCacheContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); - /** Topology version. */ - private AffinityTopologyVersion topVer; + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + boolean skipStore = opCtx != null && opCtx.skipStore(); + + for (ClusterNode node : subgrid) { + jobs.put(new GlobalRemoveAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), skipStore), + node); + } + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException { + for (ComputeJobResult locRes : results) { + if (locRes != null && (locRes.getException() != null || !locRes.<Boolean>getData())) + return false; + } + + return true; + } + } + /** + * Internal job which performs remove all primary key mappings + * operation on a cache with the given name. + */ + @GridInternal + private static class GlobalRemoveAllJob<K,V> extends TopologyVersionAwareJob { + /** */ + private static final long serialVersionUID = 0L; /** Skip store flag. */ private boolean skipStore; - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - /** * Empty constructor for serialization. */ - public GlobalRemoveAllCallable() { + public GlobalRemoveAllJob() { // No-op. } @@ -271,24 +309,20 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param topVer Topology version. * @param skipStore Skip store flag. */ - private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { - this.cacheName = cacheName; - this.topVer = topVer; + private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { + super(cacheName, topVer); + this.skipStore = skipStore; } - /** - * {@inheritDoc} - */ - @Override public Boolean call() throws Exception { - GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); + /** {@inheritDoc} */ + @Nullable @Override public Object localExecute() { + GridCacheAdapter cache = ((IgniteKernal) ignite).context().cache().internalCache(cacheName); - if (cacheAdapter == null) + if (cache == null) return false; - final GridCacheContext<K, V> ctx = cacheAdapter.context(); - - ctx.affinity().affinityReadyFuture(topVer).get(); + final GridCacheContext<K, V> ctx = cache.context(); ctx.gate().enter(); @@ -299,16 +333,16 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter GridDhtCacheAdapter<K, V> dht; GridNearCacheAdapter<K, V> near = null; - if (cacheAdapter instanceof GridNearCacheAdapter) { - near = ((GridNearCacheAdapter<K, V>)cacheAdapter); + if (cache instanceof GridNearCacheAdapter) { + near = ((GridNearCacheAdapter<K, V>) cache); dht = near.dht(); } else - dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; + dht = (GridDhtCacheAdapter<K, V>) cache; try (DataStreamerImpl<KeyCacheObject, Object> dataLdr = - (DataStreamerImpl)ignite.dataStreamer(cacheName)) { - ((DataStreamerImpl)dataLdr).maxRemapCount(0); + (DataStreamerImpl) ignite.dataStreamer(cacheName)) { + ((DataStreamerImpl) dataLdr).maxRemapCount(0); dataLdr.skipStore(skipStore); @@ -351,25 +385,14 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } } } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { ctx.gate().leave(); } return true; } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeObject(topVer); - out.writeBoolean(skipStore); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - topVer = (AffinityTopologyVersion)in.readObject(); - skipStore = in.readBoolean(); - } } }