#ignite-834: IgniteCache.clearAll() throws NPE. #ignite-732: IgniteCache.size() should not fail in case of topology changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/99c7e228 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/99c7e228 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/99c7e228 Branch: refs/heads/ignite-709_2 Commit: 99c7e228d12e25826f74d6d8706d158ec36004ed Parents: 9ff8029 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Wed May 6 12:30:57 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Wed May 6 12:30:57 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 511 +++++++++---------- .../resources/META-INF/classnames.properties | 6 +- 2 files changed, 248 insertions(+), 269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99c7e228/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3f4e97b..6674993 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -21,10 +21,10 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.*; @@ -1083,7 +1083,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(); - clearRemotes(0, new GlobalClearAllCallable(name())); + clearRemotes(0, null); } /** {@inheritDoc} */ @@ -1091,7 +1091,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(key); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); + clearRemotes(0, Collections.singleton(key)); } /** {@inheritDoc} */ @@ -1099,83 +1099,55 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocallyAll(keys); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys)); + clearRemotes(0, keys); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(K key) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); + return clearKeysAsync(Collections.singleton(key)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys)); + return clearKeysAsync(keys); } /** * @param timeout Timeout for clearLocally all task in milliseconds (0 for never). * Set it to larger value for large caches. - * @param clearCall Global clear callable object. + * @param keys Keys to clear or {@code null} if all cache should be cleared. * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. */ - private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException { - try { - // Send job to remote nodes only. - Collection<ClusterNode> nodes = - ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes(); - - IgniteInternalFuture<Object> fut = null; - - if (!nodes.isEmpty()) { - ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); + private void clearRemotes(long timeout, @Nullable final Set<? extends K> keys) throws IgniteCheckedException { + // Send job to remote nodes only. + Collection<ClusterNode> nodes = + ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes(); - fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); - } + if (!nodes.isEmpty()) { + ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); - if (fut != null) - fut.get(); - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]"); - } - catch (ComputeTaskTimeoutCheckedException e) { - U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + - "'networkTimeout' configuration property) [cacheName=" + name() + "]"); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - throw e; + ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get(); } } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync() { - return clearAsync(new GlobalClearAllCallable(name())); + return clearKeysAsync(null); } /** - * @param clearCall Global clear callable object. + * @param keys Keys to clear or {@code null} if all cache should be cleared. * @return Future. */ - private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) { + private IgniteInternalFuture<?> clearKeysAsync(final Set<? extends K> keys) { Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes(); if (!nodes.isEmpty()) { - IgniteInternalFuture<Object> fut = - ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() { - @Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException { - try { - return fut.get(); - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]"); - - return null; - } - } - }); + return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null); } else return new GridFinishedFuture<>(); @@ -3562,7 +3534,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] peekModes) { + @Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[] peekModes) { assert peekModes != null; PeekModes modes = parsePeekModes(peekModes, true); @@ -3576,22 +3548,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (nodes.isEmpty()) return new GridFinishedFuture<>(0); - IgniteInternalFuture<Collection<Integer>> fut = - ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes); - - return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() { - @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut) - throws IgniteCheckedException { - Collection<Integer> res = fut.get(); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - int totalSize = 0; - - for (Integer size : res) - totalSize += size; - - return totalSize; - } - }); + return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null); } /** {@inheritDoc} */ @@ -3909,50 +3868,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * Gets cache global size (with or without backups). - * - * @param primaryOnly {@code True} if only primary sizes should be included. - * @return Global size. - * @throws IgniteCheckedException If internal task execution failed. - */ - private int globalSize(boolean primaryOnly) throws IgniteCheckedException { - try { - // Send job to remote nodes only. - Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes(); - - IgniteInternalFuture<Collection<Integer>> fut = null; - - if (!nodes.isEmpty()) { - ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout()); - - fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes); - } - - // Get local value. - int globalSize = primaryOnly ? primarySize() : size(); - - if (fut != null) { - for (Integer i : fut.get()) - globalSize += i; - } - - return globalSize; - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]"); - - return primaryOnly ? primarySize() : size(); - } - catch (ComputeTaskTimeoutCheckedException e) { - U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + - "'networkTimeout' configuration property) [cacheName=" + name() + "]"); - - throw e; - } - } - - /** * @param op Cache operation. * @param <T> Return type. * @return Operation result. @@ -4893,67 +4808,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * Internal callable which performs clear operation on a cache with the given name. - */ - @GridInternal - private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable { - /** Cache name. */ - protected String cacheName; - - /** Injected grid instance. */ - @IgniteInstanceResource - protected Ignite ignite; - - /** - * Empty constructor for serialization. - */ - public GlobalClearCallable() { - // No-op. - } - - /** - * @param cacheName Cache name. - */ - protected GlobalClearCallable(String cacheName) { - this.cacheName = cacheName; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - } - } - - /** * Global clear all. */ @GridInternal - private static class GlobalClearAllCallable extends GlobalClearCallable { + private static class GlobalClearAllJob extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; /** * Empty constructor for serialization. */ - public GlobalClearAllCallable() { + public GlobalClearAllJob() { // No-op. } /** * @param cacheName Cache name. + * @param topVer Affinity topology version. */ - private GlobalClearAllCallable(String cacheName) { - super(cacheName); + private GlobalClearAllJob(String cacheName, AffinityTopologyVersion topVer) { + super(cacheName, topVer); } /** {@inheritDoc} */ - @Override public Object call() throws Exception { - ((IgniteEx)ignite).cachex(cacheName).clearLocally(); + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + if (cache != null) + cache.clearLocally(); return null; } @@ -4963,7 +4843,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Global clear keys. */ @GridInternal - private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable { + private static class GlobalClearKeySetJob<K> extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; @@ -4973,166 +4853,75 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Empty constructor for serialization. */ - public GlobalClearKeySetCallable() { + public GlobalClearKeySetJob() { // No-op. } /** * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param keys Keys to clear. */ - private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) { - super(cacheName); + private GlobalClearKeySetJob(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { + super(cacheName, topVer); this.keys = keys; } /** {@inheritDoc} */ - @Override public Object call() throws Exception { - ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + if (cache != null) + cache.clearLocallyAll(keys); return null; } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeObject(keys); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - keys = (Set<K>) in.readObject(); - } } /** * Internal callable for global size calculation. */ @GridInternal - private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable { + private static class SizeJob extends TopologyVersionAwareJob { /** */ private static final long serialVersionUID = 0L; - /** Cache name. */ - private String cacheName; - /** Peek modes. */ private CachePeekMode[] peekModes; - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - /** * Required by {@link Externalizable}. */ - public SizeCallable() { + public SizeJob() { // No-op. } /** * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param peekModes Cache peek modes. */ - private SizeCallable(String cacheName, CachePeekMode[] peekModes) { - this.cacheName = cacheName; - this.peekModes = peekModes; - } - - /** {@inheritDoc} */ - @Override public Integer applyx(Object o) throws IgniteCheckedException { - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - - assert cache != null : cacheName; - - return cache.localSize(peekModes); - } - - /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - - out.writeInt(peekModes.length); + private SizeJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { + super(cacheName, topVer); - for (int i = 0; i < peekModes.length; i++) - U.writeEnum(out, peekModes[i]); + this.peekModes = peekModes; } /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - - int len = in.readInt(); + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + if (cache == null) + return 0; - peekModes = new CachePeekMode[len]; - - for (int i = 0; i < len; i++) - peekModes[i] = CachePeekMode.fromOrdinal(in.readByte()); + try { + return cache.localSize(peekModes); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ public String toString() { - return S.toString(SizeCallable.class, this); - } - } - - /** - * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()} - * operation on a cache with the given name. - */ - @GridInternal - private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Cache name. */ - private String cacheName; - - /** Primary only flag. */ - private boolean primaryOnly; - - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** - * Empty constructor for serialization. - */ - public GlobalSizeCallable() { - // No-op. - } - - /** - * @param cacheName Cache name. - * @param primaryOnly Primary only flag. - */ - private GlobalSizeCallable(String cacheName, boolean primaryOnly) { - this.cacheName = cacheName; - this.primaryOnly = primaryOnly; - } - - /** {@inheritDoc} */ - @Override public Integer apply(Object o) { - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - - return primaryOnly ? cache.primarySize() : cache.size(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeBoolean(primaryOnly); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - primaryOnly = in.readBoolean(); + return S.toString(SizeJob.class, this); } } @@ -5697,4 +5486,194 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V metrics.addPutAndGetTimeNanos(System.nanoTime() - start); } } + + /** + * Delayed callable class. + */ + protected static abstract class TopologyVersionAwareJob extends ComputeJobAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Injected job context. */ + @JobContextResource + protected ComputeJobContext jobCtx; + + /** Injected grid instance. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Affinity topology version. */ + protected AffinityTopologyVersion topVer; + + /** Cache name. */ + protected String cacheName; + + /** + * Empty constructor for serialization. + */ + public TopologyVersionAwareJob() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param topVer Affinity topology version. + */ + public TopologyVersionAwareJob(String cacheName, AffinityTopologyVersion topVer) { + assert topVer != null; + + this.cacheName = cacheName; + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Nullable @Override public final Object execute() { + waitAffinityReadyFuture(); + + IgniteInternalCache cache = ((IgniteKernal)ignite).context().cache().cache(cacheName); + + return localExecute(cache); + } + + /** + * @param cache Cache. + * @return Local execution result. + */ + @Nullable protected abstract Object localExecute(@Nullable IgniteInternalCache cache); + + /** + * Holds (suspends) job execution until our cache version becomes equal to remote cache's version. + */ + private void waitAffinityReadyFuture() { + GridCacheProcessor cacheProc = ((IgniteKernal)ignite).context().cache(); + + AffinityTopologyVersion locTopVer = cacheProc.context().exchange().readyAffinityVersion(); + + if (locTopVer.compareTo(topVer) < 0) { + IgniteInternalFuture<?> fut = cacheProc.context().exchange().affinityReadyFuture(topVer); + + if (fut != null && !fut.isDone()) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + jobCtx.callcc(); + } + }); + + jobCtx.holdcc(); + } + } + } + } + + /** + * Size task. + */ + private static class SizeTask extends ComputeTaskAdapter<Object, Integer> { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache context. */ + private GridCacheContext ctx; + + /** Peek modes. */ + private CachePeekMode[] peekModes; + + /** + * Empty constructor for serialization. + */ + public SizeTask() { + // No-op. + } + + /** + * @param ctx Cache context. + */ + public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) { + this.ctx = ctx; + this.peekModes = peekModes; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + + for (ClusterNode node : subgrid) + jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException { + int size = 0; + + for (ComputeJobResult res : results) { + if (res.getException() == null && res != null) + size += res.<Integer>getData(); + } + + return size; + } + } + + /** + * Clear task. + */ + private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache context. */ + private GridCacheContext ctx; + + /** Keys to clear. */ + private Set<? extends K> keys; + + /** + * Empty constructor for serialization. + */ + public ClearTask() { + // No-op. + } + + /** + * @param ctx Cache context. + * @param keys Keys to clear. + */ + public ClearTask(GridCacheContext ctx, Set<? extends K> keys) { + this.ctx = ctx; + this.keys = keys; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + + for (ClusterNode node : subgrid) { + jobs.put(keys == null ? + new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion()) : + new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), + node); + } + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99c7e228/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 35495ed..ff263cd 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -323,13 +323,13 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$72 org.apache.ignite.internal.processors.cache.GridCacheAdapter$73 org.apache.ignite.internal.processors.cache.GridCacheAdapter$74 org.apache.ignite.internal.processors.cache.GridCacheAdapter$9 -org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllCallable +org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllJob org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearCallable -org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetCallable +org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetJob org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalSizeCallable org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadCacheClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadKeysCallable -org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeCallable +org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeJob org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure