Revert "#ignite-732: IgniteCache.size() should not fail in case of topology changes."
This reverts commit 139aa270ae61494c0757867f2dc531ec7251b1da. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c4bc9297 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c4bc9297 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c4bc9297 Branch: refs/heads/ignite-836_2 Commit: c4bc92974bace5e4cdb3ac9dc80790193e46d203 Parents: 281f4ef Author: ivasilinets <ivasilin...@gridgain.com> Authored: Sat May 2 10:05:35 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Sat May 2 10:05:35 2015 +0300 ---------------------------------------------------------------------- .../ignite/compute/ComputeTaskAdapter.java | 14 +- .../processors/cache/GridCacheAdapter.java | 503 +++++++++---------- .../processors/cache/GridCacheProcessor.java | 109 ++-- 3 files changed, 277 insertions(+), 349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4bc9297/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java index 87081fc..c2ad198 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java @@ -24,16 +24,15 @@ import java.util.*; /** * Convenience adapter for {@link ComputeTask} interface. Here is an example of - * how {@code ComputeTaskAdapter} can be used: + * how {@code GridComputeTaskAdapter} can be used: * <pre name="code" class="java"> - * public class MyFooBarTask extends ComputeTaskAdapter<String, String> { + * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { * // Inject load balancer. * @LoadBalancerResource * ComputeLoadBalancer balancer; * * // Map jobs to grid nodes. - * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg) - * throws IgniteCheckedException { + * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws IgniteCheckedException { * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); * * // In more complex cases, you can actually do @@ -77,8 +76,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> { * <p> * If remote job resulted in exception ({@link ComputeJobResult#getException()} is not {@code null}), * then {@link ComputeJobResultPolicy#FAILOVER} policy will be returned if the exception is instance - * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException}, - * which means that remote node either failed or job execution was rejected before it got a chance to start. In all + * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException}, which means that + * remote node either failed or job execution was rejected before it got a chance to start. In all * other cases the exception will be rethrown which will ultimately cause task to fail. * * @param res Received remote grid executable result. @@ -88,8 +87,7 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> { * @throws IgniteException If handling a job result caused an error effectively rejecting * a failover. This exception will be thrown out of {@link ComputeTaskFuture#get()} method. */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) - throws IgniteException { + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException { IgniteException e = res.getException(); // Try to failover if result is failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4bc9297/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 39f19b1..3f4e97b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -75,9 +75,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private static final long serialVersionUID = 0L; - /** Failed result. */ - private static final Object FAIL = new Integer(-1); - /** clearLocally() split threshold. */ public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; @@ -885,7 +882,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> entrySet() { - return entrySet((CacheEntryPredicate[])null); + return entrySet((CacheEntryPredicate[]) null); } /** {@inheritDoc} */ @@ -900,17 +897,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<K> keySet() { - return keySet((CacheEntryPredicate[])null); + return keySet((CacheEntryPredicate[]) null); } /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { - return primaryKeySet((CacheEntryPredicate[])null); + return primaryKeySet((CacheEntryPredicate[]) null); } /** {@inheritDoc} */ @Override public Collection<V> values() { - return values((CacheEntryPredicate[])null); + return values((CacheEntryPredicate[]) null); } /** @@ -1083,31 +1080,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public void clear() throws IgniteCheckedException { - clearAll(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); + // Clear local cache synchronously. + clearLocally(); + + clearRemotes(0, new GlobalClearAllCallable(name())); } /** {@inheritDoc} */ @Override public void clear(K key) throws IgniteCheckedException { - clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), - Collections.singleton(key))); + // Clear local cache synchronously. + clearLocally(key); + + clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); } /** {@inheritDoc} */ @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException { - clearAll(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), - keys)); + // Clear local cache synchronously. + clearLocallyAll(keys); + + clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(K key) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), - Collections.singleton(key))); + return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), - keys)); + return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys)); } /** @@ -1116,13 +1118,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param clearCall Global clear callable object. * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. */ - private void clearAll(long timeout, TopologyVersionAwareCallable clearCall) throws IgniteCheckedException { + private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException { try { + // Send job to remote nodes only. + Collection<ClusterNode> nodes = + ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes(); + IgniteInternalFuture<Object> fut = null; - ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); + if (!nodes.isEmpty()) { + ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); - fut = new ClearFuture(ctx, clearCall); + fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); + } if (fut != null) fut.get(); @@ -1141,18 +1149,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync() { - return clearAsync(new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); + return clearAsync(new GlobalClearAllCallable(name())); } /** * @param clearCall Global clear callable object. * @return Future. */ - private IgniteInternalFuture<?> clearAsync(TopologyVersionAwareCallable clearCall) { + private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) { Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes(); if (!nodes.isEmpty()) { - IgniteInternalFuture<Object> fut = new ClearFuture(ctx, clearCall); + IgniteInternalFuture<Object> fut = + ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() { @Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException { @@ -2108,7 +2117,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override public EntryProcessor apply(K k) { + @Override public EntryProcessor apply(K k) { return entryProcessor; } }); @@ -2136,7 +2145,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() { @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = - Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); + Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); return tx.invokeAsync(ctx, invokeMap, args); } @@ -2362,7 +2371,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()) - .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); + .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); } @Override public String toString() { @@ -2517,7 +2526,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } @Override public String toString() { @@ -2906,7 +2915,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return tx.putAllAsync(ctx, + return (GridCacheReturn) tx.putAllAsync(ctx, F.t(key, newVal), true, null, @@ -3008,7 +3017,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.deploy().registerClass(val); return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, - ctx.equalsValArray(val)).get().success(); + ctx.equalsValArray(val)).get().success(); } @Override public String toString() { @@ -3221,10 +3230,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration(); return txStart( - concurrency, - isolation, - cfg.getDefaultTxTimeout(), - 0 + concurrency, + isolation, + cfg.getDefaultTxTimeout(), + 0 ); } @@ -3567,7 +3576,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (nodes.isEmpty()) return new GridFinishedFuture<>(0); - return new SizeFuture(peekModes, ctx, modes.near); + IgniteInternalFuture<Collection<Integer>> fut = + ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes); + + return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() { + @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut) + throws IgniteCheckedException { + Collection<Integer> res = fut.get(); + + int totalSize = 0; + + for (Integer size : res) + totalSize += size; + + return totalSize; + } + }); } /** {@inheritDoc} */ @@ -3651,7 +3675,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return F.iterator(iterator(), new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() { private IgniteCacheExpiryPolicy expiryPlc = - ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); + ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) { CacheOperationContext prev = ctx.gate().enter(opCtx); @@ -3885,6 +3909,50 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Gets cache global size (with or without backups). + * + * @param primaryOnly {@code True} if only primary sizes should be included. + * @return Global size. + * @throws IgniteCheckedException If internal task execution failed. + */ + private int globalSize(boolean primaryOnly) throws IgniteCheckedException { + try { + // Send job to remote nodes only. + Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes(); + + IgniteInternalFuture<Collection<Integer>> fut = null; + + if (!nodes.isEmpty()) { + ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout()); + + fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes); + } + + // Get local value. + int globalSize = primaryOnly ? primarySize() : size(); + + if (fut != null) { + for (Integer i : fut.get()) + globalSize += i; + } + + return globalSize; + } + catch (ClusterGroupEmptyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]"); + + return primaryOnly ? primarySize() : size(); + } + catch (ComputeTaskTimeoutCheckedException e) { + U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + + "'networkTimeout' configuration property) [cacheName=" + name() + "]"); + + throw e; + } + } + + /** * @param op Cache operation. * @param <T> Return type. * @return Operation result. @@ -4825,10 +4893,47 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Internal callable which performs clear operation on a cache with the given name. + */ + @GridInternal + private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable { + /** Cache name. */ + protected String cacheName; + + /** Injected grid instance. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** + * Empty constructor for serialization. + */ + public GlobalClearCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + */ + protected GlobalClearCallable(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + } + } + + /** * Global clear all. */ @GridInternal - private static class GlobalClearAllCallable extends TopologyVersionAwareCallable { + private static class GlobalClearAllCallable extends GlobalClearCallable { /** */ private static final long serialVersionUID = 0L; @@ -4841,30 +4946,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. - * @param topVer Affinity topology version. */ - private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) { - super(cacheName, topVer); + private GlobalClearAllCallable(String cacheName) { + super(cacheName); } /** {@inheritDoc} */ - @Override protected Object callLocal() { + @Override public Object call() throws Exception { ((IgniteEx)ignite).cachex(cacheName).clearLocally(); return null; } - - /** {@inheritDoc} */ - @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { - return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes(); - } } /** * Global clear keys. */ @GridInternal - private static class GlobalClearKeySetCallable<K, V> extends TopologyVersionAwareCallable { + private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable { /** */ private static final long serialVersionUID = 0L; @@ -4880,25 +4979,33 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. - * @param topVer Affinity topology version. * @param keys Keys to clear. */ - protected GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { - super(cacheName, topVer); + private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) { + super(cacheName); this.keys = keys; } /** {@inheritDoc} */ - @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { - return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes(); + @Override public Object call() throws Exception { + ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); + + return null; } /** {@inheritDoc} */ - @Override protected Object callLocal() { - ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); - return null; + out.writeObject(keys); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + keys = (Set<K>) in.readObject(); } } @@ -4906,202 +5013,127 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Internal callable for global size calculation. */ @GridInternal - private static class GlobalSizeCallable extends TopologyVersionAwareCallable { + private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable { /** */ private static final long serialVersionUID = 0L; + /** Cache name. */ + private String cacheName; + /** Peek modes. */ private CachePeekMode[] peekModes; - /** Near enable. */ - private boolean nearEnable; + /** Injected grid instance. */ + @IgniteInstanceResource + private Ignite ignite; /** * Required by {@link Externalizable}. */ - public GlobalSizeCallable() { + public SizeCallable() { // No-op. } /** * @param cacheName Cache name. - * @param topVer Affinity topology version. * @param peekModes Cache peek modes. */ - private GlobalSizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, boolean nearEnable) { - super(cacheName, topVer); - + private SizeCallable(String cacheName, CachePeekMode[] peekModes) { + this.cacheName = cacheName; this.peekModes = peekModes; - this.nearEnable = nearEnable; - } - - /** {@inheritDoc} */ - @Override protected Object callLocal() { - try { - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - - return cache == null ? 0 : cache.localSize(peekModes); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } } /** {@inheritDoc} */ - @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { - IgniteClusterEx cluster = ctx.grid().cluster(); + @Override public Integer applyx(Object o) throws IgniteCheckedException { + IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - ClusterGroup grp = nearEnable ? cluster.forCacheNodes(ctx.name(), true, true, false) : cluster.forDataNodes(ctx.name()); + assert cache != null : cacheName; - return grp.nodes(); + return cache.localSize(peekModes); } /** {@inheritDoc} */ - public String toString() { - return S.toString(GlobalSizeCallable.class, this); - } - } - - /** - * Cache size future. - */ - private static class SizeFuture extends RetryFuture { - /** Size. */ - private int size = 0; - - /** - * @param peekModes Peek modes. - */ - public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, boolean near) { - super(ctx, new GlobalSizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, near)); - } + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); - /** {@inheritDoc} */ - @Override protected void onInit() { - size = 0; - } + out.writeInt(peekModes.length); - /** {@inheritDoc} */ - @Override protected void onLocal(Object localRes) { - size += (Integer)localRes; + for (int i = 0; i < peekModes.length; i++) + U.writeEnum(out, peekModes[i]); } /** {@inheritDoc} */ - @Override protected void allDone() { - onDone(size); - } - } + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); - /** - * Cache clear future. - */ - private static class ClearFuture extends RetryFuture { - /** - */ - public ClearFuture(GridCacheContext ctx, TopologyVersionAwareCallable clearCall) { - super(ctx, clearCall); - } + int len = in.readInt(); - /** {@inheritDoc} */ - @Override protected void onInit() { - // No-op. - } + peekModes = new CachePeekMode[len]; - /** {@inheritDoc} */ - @Override protected void onLocal(Object localRes) { - // No-op. + for (int i = 0; i < len; i++) + peekModes[i] = CachePeekMode.fromOrdinal(in.readByte()); } /** {@inheritDoc} */ - @Override protected void allDone() { - onDone(); + public String toString() { + return S.toString(SizeCallable.class, this); } } /** - * Retry future. + * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()} + * operation on a cache with the given name. */ - protected static abstract class RetryFuture<T> extends GridFutureAdapter<T> { - /** Context. */ - private final GridCacheContext ctx; + @GridInternal + private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; - /** Callable. */ - private final TopologyVersionAwareCallable call; + /** Cache name. */ + private String cacheName; - /** Max retries count before issuing an error. */ - private volatile int retries = 32; + /** Primary only flag. */ + private boolean primaryOnly; + + /** Injected grid instance. */ + @IgniteInstanceResource + private Ignite ignite; /** + * Empty constructor for serialization. */ - public RetryFuture(GridCacheContext ctx, TopologyVersionAwareCallable call) { - this.ctx = ctx; - this.call = call; - - init(); + public GlobalSizeCallable() { + // No-op. } /** - * Init. + * @param cacheName Cache name. + * @param primaryOnly Primary only flag. */ - private void init() { - Collection<ClusterNode> nodes = call.nodes(ctx); - - call.topologyVersion(ctx.affinity().affinityTopologyVersion()); - - IgniteInternalFuture<Collection<Object>> fut = ctx.closures().callAsyncNoFailover(BROADCAST, - F.asSet((Callable<Object>)call), nodes, true); - - fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Object>>>() { - @Override public void apply(IgniteInternalFuture<Collection<Object>> fut) { - try { - Collection<Object> res = fut.get(); - - onInit(); - - for (Object locRes : res) { - if (locRes == FAIL) { - if (retries-- > 0) - init(); - else { - onDone(new ClusterTopologyException("Failed to wait topology.")); - - return; - } - } + private GlobalSizeCallable(String cacheName, boolean primaryOnly) { + this.cacheName = cacheName; + this.primaryOnly = primaryOnly; + } - onLocal(locRes); - } + /** {@inheritDoc} */ + @Override public Integer apply(Object o) { + IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - allDone(); - } - catch (IgniteCheckedException e) { - if (X.hasCause(e, ClusterTopologyException.class)) { - if (retries-- > 0) - init(); - else - onDone(e); - } - else - onDone(e); - } - } - }); + return primaryOnly ? cache.primarySize() : cache.size(); } - /** - * Init reducer. - */ - protected abstract void onInit(); - - /** - * @param localRes Add local result to global result. - */ - protected abstract void onLocal(Object localRes); + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + out.writeBoolean(primaryOnly); + } - /** - * On done. - */ - protected abstract void allDone(); + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + primaryOnly = in.readBoolean(); + } } /** @@ -5665,89 +5697,4 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V metrics.addPutAndGetTimeNanos(System.nanoTime() - start); } } - - /** - * Delayed callable class. - */ - protected static abstract class TopologyVersionAwareCallable<K, V> implements Serializable, Callable<Object> { - /** */ - private static final long serialVersionUID = 0L; - - /** Injected grid instance. */ - @IgniteInstanceResource - protected Ignite ignite; - - /** Affinity topology version. */ - protected AffinityTopologyVersion topVer; - - /** Cache name. */ - protected String cacheName; - - /** - * Empty constructor for serialization. - */ - public TopologyVersionAwareCallable() { - // No-op. - } - - /** - * @param topVer Affinity topology version. - */ - public TopologyVersionAwareCallable(String cacheName, AffinityTopologyVersion topVer) { - this.cacheName = cacheName; - this.topVer = topVer; - } - - /** {@inheritDoc} */ - @Override public Object call() throws Exception { - if (!compareTopologyVersions()) - return FAIL; - - Object res = callLocal(); - - if (!compareTopologyVersions()) - return FAIL; - else - return res; - } - - /** - * Call local. - * - * @return Local result. - * @throws IgniteCheckedException If failed. - */ - protected abstract Object callLocal() throws IgniteCheckedException; - - /** - * @param ctx Grid cache context. - * @return Nodes to call. - */ - protected abstract Collection<ClusterNode> nodes(GridCacheContext ctx); - - /** - * Compare topology versions. - */ - public boolean compareTopologyVersions() { - GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache(); - - GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName); - - if (cacheAdapter == null) - return false; - - final GridCacheContext<K, V> ctx = cacheAdapter.context(); - - AffinityTopologyVersion locTopVer = ctx.affinity().affinityTopologyVersion(); - - return locTopVer.compareTo(topVer) == 0; - } - - /** - * @param topVer Affinity topology version. - */ - public void topologyVersion(AffinityTopologyVersion topVer) { - this.topVer = topVer; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c4bc9297/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 77fa104..c0026ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -124,9 +124,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Must use JDK marshaller since it is used by discovery to fire custom events. */ private Marshaller marshaller = new JdkMarshaller(); - /** Count down latch for caches. */ - private CountDownLatch cacheStartedLatch = new CountDownLatch(1); - /** * @param ctx Kernal context. */ @@ -660,92 +657,87 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStart() throws IgniteCheckedException { - try { - if (ctx.config().isDaemon()) - return; + if (ctx.config().isDaemon()) + return; - ClusterNode locNode = ctx.discovery().localNode(); + ClusterNode locNode = ctx.discovery().localNode(); - // Init cache plugin managers. - final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); + // Init cache plugin managers. + final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>(); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - CacheConfiguration locCcfg = desc.cacheConfiguration(); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration locCcfg = desc.cacheConfiguration(); - CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); + CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg); - cache2PluginMgr.put(locCcfg.getName(), pluginMgr); - } + cache2PluginMgr.put(locCcfg.getName(), pluginMgr); + } - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) { - checkTransactionConfiguration(n); + if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { + for (ClusterNode n : ctx.discovery().remoteNodes()) { + checkTransactionConfiguration(n); - DeploymentMode locDepMode = ctx.config().getDeploymentMode(); - DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); + DeploymentMode locDepMode = ctx.config().getDeploymentMode(); + DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE); - CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", - locDepMode, rmtDepMode, true); + CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode", + locDepMode, rmtDepMode, true); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id()); - if (rmtCfg != null) { - CacheConfiguration locCfg = desc.cacheConfiguration(); + if (rmtCfg != null) { + CacheConfiguration locCfg = desc.cacheConfiguration(); - checkCache(locCfg, rmtCfg, n); + checkCache(locCfg, rmtCfg, n); - // Check plugin cache configurations. - CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); + // Check plugin cache configurations. + CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName()); - assert pluginMgr != null : " Map=" + cache2PluginMgr; + assert pluginMgr != null : " Map=" + cache2PluginMgr; - pluginMgr.validateRemotes(rmtCfg, n); - } + pluginMgr.validateRemotes(rmtCfg, n); } } } + } - // Start dynamic caches received from collect discovery data. - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - boolean started = desc.onStart(); + // Start dynamic caches received from collect discovery data. + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + boolean started = desc.onStart(); - assert started : "Failed to change started flag for locally configured cache: " + desc; + assert started : "Failed to change started flag for locally configured cache: " + desc; - desc.clearRemoteConfigurations(); + desc.clearRemoteConfigurations(); - CacheConfiguration ccfg = desc.cacheConfiguration(); + CacheConfiguration ccfg = desc.cacheConfiguration(); - IgnitePredicate filter = ccfg.getNodeFilter(); + IgnitePredicate filter = ccfg.getNodeFilter(); - if (filter.apply(locNode)) { - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); + if (filter.apply(locNode)) { + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); + CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName()); - assert pluginMgr != null : " Map=" + cache2PluginMgr; + assert pluginMgr != null : " Map=" + cache2PluginMgr; - GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); + GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); - ctx.dynamicDeploymentId(desc.deploymentId()); + ctx.dynamicDeploymentId(desc.deploymentId()); - sharedCtx.addCacheContext(ctx); + sharedCtx.addCacheContext(ctx); - GridCacheAdapter cache = ctx.cache(); + GridCacheAdapter cache = ctx.cache(); - String name = ccfg.getName(); + String name = ccfg.getName(); - caches.put(maskNull(name), cache); + caches.put(maskNull(name), cache); - startCache(cache); + startCache(cache); - jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); - } + jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false)); } } - finally { - cacheStartedLatch.countDown(); - } ctx.marshallerContext().onMarshallerCacheStarted(ctx); @@ -843,8 +835,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void onKernalStop(boolean cancel) { - cacheStartedLatch.countDown(); - if (ctx.config().isDaemon()) return; @@ -2696,13 +2686,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Getting internal cache adapter: " + name); - try { - cacheStartedLatch.await(); - } - catch (InterruptedException e) { - throw new IgniteException("Failed to wait starting caches."); - } - return (GridCacheAdapter<K, V>)caches.get(maskNull(name)); }