#ignite-732: IgniteCache.size() should not fail in case of topology changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/139aa270 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/139aa270 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/139aa270 Branch: refs/heads/ignite-gg-9819 Commit: 139aa270ae61494c0757867f2dc531ec7251b1da Parents: 0885ac0 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Apr 30 18:43:56 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Apr 30 18:43:56 2015 +0300 ---------------------------------------------------------------------- .../ignite/compute/ComputeTaskAdapter.java | 14 +- .../processors/cache/GridCacheAdapter.java | 503 ++++++++++--------- .../processors/cache/GridCacheProcessor.java | 109 ++-- 3 files changed, 349 insertions(+), 277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/139aa270/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java index c2ad198..87081fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java @@ -24,15 +24,16 @@ import java.util.*; /** * Convenience adapter for {@link ComputeTask} interface. Here is an example of - * how {@code GridComputeTaskAdapter} can be used: + * how {@code ComputeTaskAdapter} can be used: * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { + * public class MyFooBarTask extends ComputeTaskAdapter<String, String> { * // Inject load balancer. * @LoadBalancerResource * ComputeLoadBalancer balancer; * * // Map jobs to grid nodes. - * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws IgniteCheckedException { + * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg) + * throws IgniteCheckedException { * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); * * // In more complex cases, you can actually do @@ -76,8 +77,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> { * <p> * If remote job resulted in exception ({@link ComputeJobResult#getException()} is not {@code null}), * then {@link ComputeJobResultPolicy#FAILOVER} policy will be returned if the exception is instance - * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException}, which means that - * remote node either failed or job execution was rejected before it got a chance to start. In all + * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException}, + * which means that remote node either failed or job execution was rejected before it got a chance to start. In all * other cases the exception will be rethrown which will ultimately cause task to fail. * * @param res Received remote grid executable result. @@ -87,7 +88,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> { * @throws IgniteException If handling a job result caused an error effectively rejecting * a failover. This exception will be thrown out of {@link ComputeTaskFuture#get()} method. */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException { + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) + throws IgniteException { IgniteException e = res.getException(); // Try to failover if result is failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/139aa270/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3f4e97b..39f19b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -75,6 +75,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private static final long serialVersionUID = 0L; + /** Failed result. */ + private static final Object FAIL = new Integer(-1); + /** clearLocally() split threshold. */ public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; @@ -882,7 +885,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> entrySet() { - return entrySet((CacheEntryPredicate[]) null); + return entrySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -897,17 +900,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<K> keySet() { - return keySet((CacheEntryPredicate[]) null); + return keySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { - return primaryKeySet((CacheEntryPredicate[]) null); + return primaryKeySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @Override public Collection<V> values() { - return values((CacheEntryPredicate[]) null); + return values((CacheEntryPredicate[])null); } /** @@ -1080,36 +1083,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public void clear() throws IgniteCheckedException { - // Clear local cache synchronously. - clearLocally(); - - clearRemotes(0, new GlobalClearAllCallable(name())); + clearAll(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); } /** {@inheritDoc} */ @Override public void clear(K key) throws IgniteCheckedException { - // Clear local cache synchronously. - clearLocally(key); - - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); + clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + Collections.singleton(key))); } /** {@inheritDoc} */ @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException { - // Clear local cache synchronously. - clearLocallyAll(keys); - - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys)); + clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + keys)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(K key) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); + return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + Collections.singleton(key))); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys)); + return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + keys)); } /** @@ -1118,19 +1116,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param clearCall Global clear callable object. * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. */ - private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException { + private void clearAll(long timeout, TopologyVersionAwareCallable clearCall) throws IgniteCheckedException { try { - // Send job to remote nodes only. - Collection<ClusterNode> nodes = - ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes(); - IgniteInternalFuture<Object> fut = null; - if (!nodes.isEmpty()) { - ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); + ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); - fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); - } + fut = new ClearFuture(ctx, clearCall); if (fut != null) fut.get(); @@ -1149,19 +1141,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync() { - return clearAsync(new GlobalClearAllCallable(name())); + return clearAsync(new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); } /** * @param clearCall Global clear callable object. * @return Future. */ - private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) { + private IgniteInternalFuture<?> clearAsync(TopologyVersionAwareCallable clearCall) { Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes(); if (!nodes.isEmpty()) { - IgniteInternalFuture<Object> fut = - ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); + IgniteInternalFuture<Object> fut = new ClearFuture(ctx, clearCall); return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() { @Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException { @@ -2117,7 +2108,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override public EntryProcessor apply(K k) { + @Override public EntryProcessor apply(K k) { return entryProcessor; } }); @@ -2145,7 +2136,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() { @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = - Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); + Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); return tx.invokeAsync(ctx, invokeMap, args); } @@ -2371,7 +2362,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()) - .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); + .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @Override public String toString() { @@ -2526,7 +2517,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @Override public String toString() { @@ -2915,7 +2906,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return (GridCacheReturn) tx.putAllAsync(ctx, + return tx.putAllAsync(ctx, F.t(key, newVal), true, null, @@ -3017,7 +3008,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.deploy().registerClass(val); return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, - ctx.equalsValArray(val)).get().success(); + ctx.equalsValArray(val)).get().success(); } @Override public String toString() { @@ -3230,10 +3221,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration(); return txStart( - concurrency, - isolation, - cfg.getDefaultTxTimeout(), - 0 + concurrency, + isolation, + cfg.getDefaultTxTimeout(), + 0 ); } @@ -3576,22 +3567,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (nodes.isEmpty()) return new GridFinishedFuture<>(0); - IgniteInternalFuture<Collection<Integer>> fut = - ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes); - - return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() { - @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut) - throws IgniteCheckedException { - Collection<Integer> res = fut.get(); - - int totalSize = 0; - - for (Integer size : res) - totalSize += size; - - return totalSize; - } - }); + return new SizeFuture(peekModes, ctx, modes.near); } /** {@inheritDoc} */ @@ -3675,7 +3651,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return F.iterator(iterator(), new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() { private IgniteCacheExpiryPolicy expiryPlc = - ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); + ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) { CacheOperationContext prev = ctx.gate().enter(opCtx); @@ -3909,50 +3885,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * Gets cache global size (with or without backups). - * - * @param primaryOnly {@code True} if only primary sizes should be included. - * @return Global size. - * @throws IgniteCheckedException If internal task execution failed. - */ - private int globalSize(boolean primaryOnly) throws IgniteCheckedException { - try { - // Send job to remote nodes only. - Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes(); - - IgniteInternalFuture<Collection<Integer>> fut = null; - - if (!nodes.isEmpty()) { - ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout()); - - fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes); - } - - // Get local value. - int globalSize = primaryOnly ? primarySize() : size(); - - if (fut != null) { - for (Integer i : fut.get()) - globalSize += i; - } - - return globalSize; - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]"); - - return primaryOnly ? primarySize() : size(); - } - catch (ComputeTaskTimeoutCheckedException e) { - U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + - "'networkTimeout' configuration property) [cacheName=" + name() + "]"); - - throw e; - } - } - - /** * @param op Cache operation. * @param <T> Return type. * @return Operation result. @@ -4893,47 +4825,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * Internal callable which performs clear operation on a cache with the given name. - */ - @GridInternal - private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable { - /** Cache name. */ - protected String cacheName; - - /** Injected grid instance. */ - @IgniteInstanceResource - protected Ignite ignite; - - /** - * Empty constructor for serialization. - */ - public GlobalClearCallable() { - // No-op. - } - - /** - * @param cacheName Cache name. - */ - protected GlobalClearCallable(String cacheName) { - this.cacheName = cacheName; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - } - } - - /** * Global clear all. */ @GridInternal - private static class GlobalClearAllCallable extends GlobalClearCallable { + private static class GlobalClearAllCallable extends TopologyVersionAwareCallable { /** */ private static final long serialVersionUID = 0L; @@ -4946,24 +4841,30 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. */ - private GlobalClearAllCallable(String cacheName) { - super(cacheName); + private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) { + super(cacheName, topVer); } /** {@inheritDoc} */ - @Override public Object call() throws Exception { + @Override protected Object callLocal() { ((IgniteEx)ignite).cachex(cacheName).clearLocally(); return null; } + + /** {@inheritDoc} */ + @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { + return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes(); + } } /** * Global clear keys. */ @GridInternal - private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable { + private static class GlobalClearKeySetCallable<K, V> extends TopologyVersionAwareCallable { /** */ private static final long serialVersionUID = 0L; @@ -4979,33 +4880,25 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param keys Keys to clear. */ - private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) { - super(cacheName); + protected GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { + super(cacheName, topVer); this.keys = keys; } /** {@inheritDoc} */ - @Override public Object call() throws Exception { - ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); - - return null; + @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { + return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes(); } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeObject(keys); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); + @Override protected Object callLocal() { + ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); - keys = (Set<K>) in.readObject(); + return null; } } @@ -5013,127 +4906,202 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Internal callable for global size calculation. */ @GridInternal - private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable { + private static class GlobalSizeCallable extends TopologyVersionAwareCallable { /** */ private static final long serialVersionUID = 0L; - /** Cache name. */ - private String cacheName; - /** Peek modes. */ private CachePeekMode[] peekModes; - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; + /** Near enable. */ + private boolean nearEnable; /** * Required by {@link Externalizable}. */ - public SizeCallable() { + public GlobalSizeCallable() { // No-op. } /** * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param peekModes Cache peek modes. */ - private SizeCallable(String cacheName, CachePeekMode[] peekModes) { - this.cacheName = cacheName; + private GlobalSizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, boolean nearEnable) { + super(cacheName, topVer); + this.peekModes = peekModes; + this.nearEnable = nearEnable; } /** {@inheritDoc} */ - @Override public Integer applyx(Object o) throws IgniteCheckedException { - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - - assert cache != null : cacheName; + @Override protected Object callLocal() { + try { + IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - return cache.localSize(peekModes); + return cache == null ? 0 : cache.localSize(peekModes); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); + @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { + IgniteClusterEx cluster = ctx.grid().cluster(); - out.writeInt(peekModes.length); + ClusterGroup grp = nearEnable ? cluster.forCacheNodes(ctx.name(), true, true, false) : cluster.forDataNodes(ctx.name()); - for (int i = 0; i < peekModes.length; i++) - U.writeEnum(out, peekModes[i]); + return grp.nodes(); } /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); + public String toString() { + return S.toString(GlobalSizeCallable.class, this); + } + } - int len = in.readInt(); + /** + * Cache size future. + */ + private static class SizeFuture extends RetryFuture { + /** Size. */ + private int size = 0; - peekModes = new CachePeekMode[len]; + /** + * @param peekModes Peek modes. + */ + public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, boolean near) { + super(ctx, new GlobalSizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, near)); + } - for (int i = 0; i < len; i++) - peekModes[i] = CachePeekMode.fromOrdinal(in.readByte()); + /** {@inheritDoc} */ + @Override protected void onInit() { + size = 0; } /** {@inheritDoc} */ - public String toString() { - return S.toString(SizeCallable.class, this); + @Override protected void onLocal(Object localRes) { + size += (Integer)localRes; + } + + /** {@inheritDoc} */ + @Override protected void allDone() { + onDone(size); } } /** - * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()} - * operation on a cache with the given name. + * Cache clear future. */ - @GridInternal - private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; + private static class ClearFuture extends RetryFuture { + /** + */ + public ClearFuture(GridCacheContext ctx, TopologyVersionAwareCallable clearCall) { + super(ctx, clearCall); + } - /** Cache name. */ - private String cacheName; + /** {@inheritDoc} */ + @Override protected void onInit() { + // No-op. + } - /** Primary only flag. */ - private boolean primaryOnly; + /** {@inheritDoc} */ + @Override protected void onLocal(Object localRes) { + // No-op. + } - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; + /** {@inheritDoc} */ + @Override protected void allDone() { + onDone(); + } + } + + /** + * Retry future. + */ + protected static abstract class RetryFuture<T> extends GridFutureAdapter<T> { + /** Context. */ + private final GridCacheContext ctx; + + /** Callable. */ + private final TopologyVersionAwareCallable call; + + /** Max retries count before issuing an error. */ + private volatile int retries = 32; /** - * Empty constructor for serialization. */ - public GlobalSizeCallable() { - // No-op. + public RetryFuture(GridCacheContext ctx, TopologyVersionAwareCallable call) { + this.ctx = ctx; + this.call = call; + + init(); } /** - * @param cacheName Cache name. - * @param primaryOnly Primary only flag. + * Init. */ - private GlobalSizeCallable(String cacheName, boolean primaryOnly) { - this.cacheName = cacheName; - this.primaryOnly = primaryOnly; - } + private void init() { + Collection<ClusterNode> nodes = call.nodes(ctx); - /** {@inheritDoc} */ - @Override public Integer apply(Object o) { - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); + call.topologyVersion(ctx.affinity().affinityTopologyVersion()); - return primaryOnly ? cache.primarySize() : cache.size(); - } + IgniteInternalFuture<Collection<Object>> fut = ctx.closures().callAsyncNoFailover(BROADCAST, + F.asSet((Callable<Object>)call), nodes, true); - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeBoolean(primaryOnly); - } + fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Object>>>() { + @Override public void apply(IgniteInternalFuture<Collection<Object>> fut) { + try { + Collection<Object> res = fut.get(); - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - primaryOnly = in.readBoolean(); + onInit(); + + for (Object locRes : res) { + if (locRes == FAIL) { + if (retries-- > 0) + init(); + else { + onDone(new ClusterTopologyException("Failed to wait topology.")); + + return; + } + } + + onLocal(locRes); + } + + allDone(); + } + catch (IgniteCheckedException e) { + if (X.hasCause(e, ClusterTopologyException.class)) { + if (retries-- > 0) + init(); + else + onDone(e); + } + else + onDone(e); + } + } + }); } + + /** + * Init reducer. + */ + protected abstract void onInit(); + + /** + * @param localRes Add local result to global result. + */ + protected abstract void onLocal(Object localRes); + + /** + * On done. + */ + protected abstract void allDone(); } /** @@ -5697,4 +5665,89 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V metrics.addPutAndGetTimeNanos(System.nanoTime() - start); } } + + /** + * Delayed callable class. + */ + protected static abstract class TopologyVersionAwareCallable<K, V> implements Serializable, Callable<Object> { + /** */ + private static final long serialVersionUID = 0L; + + /** Injected grid instance. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Affinity topology version. */ + protected AffinityTopologyVersion topVer; + + /** Cache name. */ + protected String cacheName; + + /** + * Empty constructor for serialization. + */ + public TopologyVersionAwareCallable() { + // No-op. + } + + /** + * @param topVer Affinity topology version. + */ + public TopologyVersionAwareCallable(String cacheName, AffinityTopologyVersion topVer) { + this.cacheName = cacheName; + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + if (!compareTopologyVersions()) + return FAIL; + + Object res = callLocal(); + + if (!compareTopologyVersions()) + return FAIL; + else + return res; + } + + /** + * Call local. + * + * @return Local result. + * @throws IgniteCheckedException If failed. + */ + protected abstract Object callLocal() throws IgniteCheckedException; + + /** + * @param ctx Grid cache context. + * @return Nodes to call. + */ + protected abstract Collection<ClusterNode> nodes(GridCacheContext ctx); + + /** + * Compare topology versions. + */ + public boolean compareTopologyVersions() { + GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache(); + + GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName); + + if (cacheAdapter == null) + return false; + + final GridCacheContext<K, V> ctx = cacheAdapter.context(); + + AffinityTopologyVersion locTopVer = ctx.affinity().affinityTopologyVersion(); + + return locTopVer.compareTo(topVer) == 0; + } + + /** + * @param topVer Affinity topology version. + */ + public void topologyVersion(AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/139aa270/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c0026ab..77fa104 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -124,6 +124,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Must use JDK marshaller since it is used by discovery to fire custom events. */ private Marshaller marshaller = new JdkMarshaller(); + /** Count down latch for caches. */ + private CountDownLatch cacheStartedLatch = new CountDownLatch(1); + /** * @param ctx Kernal context. */ @@ -657,87 +660,92 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; + try { + if (ctx.config().isDaemon()) + return; - ClusterNode locNode = ctx.discovery().localNode(); + ClusterNode locNode = ctx.discovery().localNode(); - // Init cache plugin managers. - final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); + // Init cache plugin managers. + final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - CacheConfiguration locCcfg = desc.cacheConfiguration(); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration locCcfg = desc.cacheConfiguration(); - CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); + CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); - cache2PluginMgr.put(locCcfg.getName(), pluginMgr); - } + cache2PluginMgr.put(locCcfg.getName(), pluginMgr); + } - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) { - checkTransactionConfiguration(n); + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) { + checkTransactionConfiguration(n); - DeploymentMode locDepMode = ctx.config().getDeploymentMode(); - DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); + DeploymentMode locDepMode = ctx.config().getDeploymentMode(); + DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); - CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", - locDepMode, rmtDepMode, true); + CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", + locDepMode, rmtDepMode, true); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); - if (rmtCfg != null) { - CacheConfiguration locCfg = desc.cacheConfiguration(); + if (rmtCfg != null) { + CacheConfiguration locCfg = desc.cacheConfiguration(); - checkCache(locCfg, rmtCfg, n); + checkCache(locCfg, rmtCfg, n); - // Check plugin cache configurations. - CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); + // Check plugin cache configurations. + CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); - assert pluginMgr != null : " Map=" + cache2PluginMgr; + assert pluginMgr != null : " Map=" + cache2PluginMgr; - pluginMgr.validateRemotes(rmtCfg, n); + pluginMgr.validateRemotes(rmtCfg, n); + } } } } - } - // Start dynamic caches received from collect discovery data. - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - boolean started = desc.onStart(); + // Start dynamic caches received from collect discovery data. + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + boolean started = desc.onStart(); - assert started : "Failed to change started flag for locally configured cache: " + desc; + assert started : "Failed to change started flag for locally configured cache: " + desc; - desc.clearRemoteConfigurations(); + desc.clearRemoteConfigurations(); - CacheConfiguration ccfg = desc.cacheConfiguration(); + CacheConfiguration ccfg = desc.cacheConfiguration(); - IgnitePredicate filter = ccfg.getNodeFilter(); + IgnitePredicate filter = ccfg.getNodeFilter(); - if (filter.apply(locNode)) { - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); + if (filter.apply(locNode)) { + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); + CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); - assert pluginMgr != null : " Map=" + cache2PluginMgr; + assert pluginMgr != null : " Map=" + cache2PluginMgr; - GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); + GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); - ctx.dynamicDeploymentId(desc.deploymentId()); + ctx.dynamicDeploymentId(desc.deploymentId()); - sharedCtx.addCacheContext(ctx); + sharedCtx.addCacheContext(ctx); - GridCacheAdapter cache = ctx.cache(); + GridCacheAdapter cache = ctx.cache(); - String name = ccfg.getName(); + String name = ccfg.getName(); - caches.put(maskNull(name), cache); + caches.put(maskNull(name), cache); - startCache(cache); + startCache(cache); - jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); + jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); + } } } + finally { + cacheStartedLatch.countDown(); + } ctx.marshallerContext().onMarshallerCacheStarted(ctx); @@ -835,6 +843,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStop(boolean cancel) { + cacheStartedLatch.countDown(); + if (ctx.config().isDaemon()) return; @@ -2686,6 +2696,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Getting internal cache adapter: " + name); + try { + cacheStartedLatch.await(); + } + catch (InterruptedException e) { + throw new IgniteException("Failed to wait starting caches."); + } + return (GridCacheAdapter<K, V>)caches.get(maskNull(name)); }