Repository: incubator-ignite Updated Branches: refs/heads/ignite-732 1b6f1e5b5 -> e50ba9efe (forced update)
ignite-732 IgniteCache.size() should not fail in case of topology changes Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e50ba9ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e50ba9ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e50ba9ef Branch: refs/heads/ignite-732 Commit: e50ba9efeb098dc0bc583d9d921cf715bac8feff Parents: dfca76b Author: agura <ag...@gridgain.com> Authored: Wed Apr 29 22:20:57 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Wed Apr 29 22:44:13 2015 +0300 ---------------------------------------------------------------------- .../ignite/compute/ComputeTaskAdapter.java | 14 +- .../processors/cache/GridCacheAdapter.java | 169 +++++++------------ .../processors/task/GridTaskWorker.java | 3 +- .../cache/GridCacheSizeTopologyChangedTest.java | 140 +++++++++++++++ 4 files changed, 208 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ba9ef/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java index c2ad198..87081fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java @@ -24,15 +24,16 @@ import java.util.*; /** * Convenience adapter for {@link ComputeTask} interface. Here is an example of - * how {@code GridComputeTaskAdapter} can be used: + * how {@code ComputeTaskAdapter} can be used: * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { + * public class MyFooBarTask extends ComputeTaskAdapter<String, String> { * // Inject load balancer. * @LoadBalancerResource * ComputeLoadBalancer balancer; * * // Map jobs to grid nodes. - * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws IgniteCheckedException { + * public Map<? extends ComputeJob, GridNode> map(List<GridNode> subgrid, String arg) + * throws IgniteCheckedException { * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); * * // In more complex cases, you can actually do @@ -76,8 +77,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> { * <p> * If remote job resulted in exception ({@link ComputeJobResult#getException()} is not {@code null}), * then {@link ComputeJobResultPolicy#FAILOVER} policy will be returned if the exception is instance - * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException}, which means that - * remote node either failed or job execution was rejected before it got a chance to start. In all + * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link ComputeExecutionRejectedException}, + * which means that remote node either failed or job execution was rejected before it got a chance to start. In all * other cases the exception will be rethrown which will ultimately cause task to fail. * * @param res Received remote grid executable result. @@ -87,7 +88,8 @@ public abstract class ComputeTaskAdapter<T, R> implements ComputeTask<T, R> { * @throws IgniteException If handling a job result caused an error effectively rejecting * a failover. This exception will be thrown out of {@link ComputeTaskFuture#get()} method. */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException { + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) + throws IgniteException { IgniteException e = res.getException(); // Try to failover if result is failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ba9ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3f4e97b..ac508d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -882,7 +882,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> entrySet() { - return entrySet((CacheEntryPredicate[]) null); + return entrySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -897,17 +897,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<K> keySet() { - return keySet((CacheEntryPredicate[]) null); + return keySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { - return primaryKeySet((CacheEntryPredicate[]) null); + return primaryKeySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @Override public Collection<V> values() { - return values((CacheEntryPredicate[]) null); + return values((CacheEntryPredicate[])null); } /** @@ -2117,7 +2117,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override public EntryProcessor apply(K k) { + @Override public EntryProcessor apply(K k) { return entryProcessor; } }); @@ -2145,7 +2145,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() { @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = - Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); + Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); return tx.invokeAsync(ctx, invokeMap, args); } @@ -2371,7 +2371,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()) - .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); + .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @Override public String toString() { @@ -2526,7 +2526,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @Override public String toString() { @@ -2915,7 +2915,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return (GridCacheReturn) tx.putAllAsync(ctx, + return tx.putAllAsync(ctx, F.t(key, newVal), true, null, @@ -3017,7 +3017,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.deploy().registerClass(val); return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, - ctx.equalsValArray(val)).get().success(); + ctx.equalsValArray(val)).get().success(); } @Override public String toString() { @@ -3230,10 +3230,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration(); return txStart( - concurrency, - isolation, - cfg.getDefaultTxTimeout(), - 0 + concurrency, + isolation, + cfg.getDefaultTxTimeout(), + 0 ); } @@ -3576,22 +3576,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (nodes.isEmpty()) return new GridFinishedFuture<>(0); - IgniteInternalFuture<Collection<Integer>> fut = - ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes); - - return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() { - @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut) - throws IgniteCheckedException { - Collection<Integer> res = fut.get(); - - int totalSize = 0; - - for (Integer size : res) - totalSize += size; - - return totalSize; - } - }); + return new SizeFuture(peekModes, ctx, nodes); } /** {@inheritDoc} */ @@ -3675,7 +3660,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return F.iterator(iterator(), new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() { private IgniteCacheExpiryPolicy expiryPlc = - ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); + ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) { CacheOperationContext prev = ctx.gate().enter(opCtx); @@ -3909,50 +3894,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * Gets cache global size (with or without backups). - * - * @param primaryOnly {@code True} if only primary sizes should be included. - * @return Global size. - * @throws IgniteCheckedException If internal task execution failed. - */ - private int globalSize(boolean primaryOnly) throws IgniteCheckedException { - try { - // Send job to remote nodes only. - Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes(); - - IgniteInternalFuture<Collection<Integer>> fut = null; - - if (!nodes.isEmpty()) { - ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout()); - - fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes); - } - - // Get local value. - int globalSize = primaryOnly ? primarySize() : size(); - - if (fut != null) { - for (Integer i : fut.get()) - globalSize += i; - } - - return globalSize; - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache clearLocally [cacheName=" + name() + "]"); - - return primaryOnly ? primarySize() : size(); - } - catch (ComputeTaskTimeoutCheckedException e) { - U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider increasing " + - "'networkTimeout' configuration property) [cacheName=" + name() + "]"); - - throw e; - } - } - - /** * @param op Cache operation. * @param <T> Return type. * @return Operation result. @@ -5082,57 +5023,63 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()} - * operation on a cache with the given name. + * Cache size future. */ - @GridInternal - private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; + private static class SizeFuture extends GridFutureAdapter<Integer> { + /** Peek modes. */ + private final CachePeekMode[] peekModes; - /** Cache name. */ - private String cacheName; + /** Context. */ + private final GridCacheContext ctx; - /** Primary only flag. */ - private boolean primaryOnly; + /** Nodes. */ + private final Collection<ClusterNode> nodes; - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; + /** Max retries count before issuing an error. */ + private int retries = 32; /** - * Empty constructor for serialization. + * @param peekModes Peek modes. */ - public GlobalSizeCallable() { - // No-op. + public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, Collection<ClusterNode> nodes) { + this.peekModes = peekModes; + this.ctx = ctx; + this.nodes = nodes; + + init(); } /** - * @param cacheName Cache name. - * @param primaryOnly Primary only flag. + * Init. */ - private GlobalSizeCallable(String cacheName, boolean primaryOnly) { - this.cacheName = cacheName; - this.primaryOnly = primaryOnly; - } + private void init() { + IgniteInternalFuture<Collection<Integer>> fut = + ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes); - /** {@inheritDoc} */ - @Override public Integer apply(Object o) { - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); + fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Integer>>>() { + @Override public void apply(IgniteInternalFuture<Collection<Integer>> fut) { + try { + Collection<Integer> res = fut.get(); - return primaryOnly ? cache.primarySize() : cache.size(); - } + int size = 0; - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeBoolean(primaryOnly); - } + for (Integer locSize : res) + size += locSize; - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - primaryOnly = in.readBoolean(); + onDone(size); + } + catch (IgniteCheckedException e) { + if (X.hasCause(e, ClusterTopologyException.class, NullPointerException.class)) { + if (retries-- > 0) + init(); + else + onDone(e); + } + else + onDone(e); + } + } + }); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ba9ef/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index f6d686c..0f8b36b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -852,7 +852,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { } catch (IgniteException e) { if (X.hasCause(e, GridInternalException.class) || - X.hasCause(e, IgfsOutOfSpaceException.class)) { + X.hasCause(e, IgfsOutOfSpaceException.class) || + X.hasCause(e, ClusterTopologyException.class)) { // Print internal exceptions only if debug is enabled. if (log.isDebugEnabled()) U.error(log, "Failed to obtain remote job result policy for result from " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ba9ef/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java new file mode 100644 index 0000000..031caf0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCacheSizeTopologyChangedTest extends GridCommonAbstractTest { + /** Grids count */ + private static int GRIDS_CNT = 15; + + /** Keys count */ + private static int KEYS_CNT = 10_000_000; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(ATOMIC); + + ccfg.setCacheMode(PARTITIONED); + + ccfg.setBackups(1); + + cfg.setCacheConfiguration(defaultCacheConfiguration()); + + return cfg; + } + + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception if failed + */ + public void testCacheSize() throws Exception { + Ignite g0 = startGrid(0); + //Ignite g0 = startGrids(GRIDS_CNT); + + final AtomicBoolean canceled = new AtomicBoolean(); + + final Random rnd = new Random(); + + final boolean[] status = new boolean[GRIDS_CNT]; + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while(!canceled.get()) { + int idx = rnd.nextInt(GRIDS_CNT); + + if (idx > 0) { + boolean state = status[idx]; + + if (state) { + System.out.println("!!! STOP " + idx); + stopGrid(idx); + } + else { + System.out.println("!!! START " + idx); + + startGrid(idx); + } + + status[idx] = !state; + + U.sleep(2000); + } + } + return null; + } + }); + + try { + IgniteCache<Integer, Integer> cache = g0.cache(null); + + for (int i = 0; i < KEYS_CNT; i++) { + cache.put(i, 0); + + int size = cache.size(); + + if (i % 1000 == 0) + System.out.println("!!! Key: " + i + ", size: " + size); + + if (size == -1) + System.out.println("!!! SIZE: " + size); + } + + canceled.set(true); + + Thread.sleep(5000); + + System.out.println("!!! ASSERT"); + assertEquals(KEYS_CNT, cache.size()); + } + catch (Exception e) { + System.out.println("!!!!!"); + e.printStackTrace(); + } + finally { + canceled.set(true); + fut.get(); + } + } + +}