Repository: incubator-ignite Updated Branches: refs/heads/ignite-1.3.3-p3 8ced20733 -> 8f1c1c0d7
IGNITE-1265 - Added ready future to ClusterTopologyException, added test for correct explicit transaction retries. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8f1c1c0d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8f1c1c0d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8f1c1c0d Branch: refs/heads/ignite-1.3.3-p3 Commit: 8f1c1c0d7406f9b43681a6da69560c8ba6bbb737 Parents: 8ced207 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Aug 19 22:55:48 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Aug 19 22:55:48 2015 -0700 ---------------------------------------------------------------------- .../cluster/ClusterTopologyException.java | 18 ++ .../ClusterTopologyCheckedException.java | 18 ++ .../cache/GridCacheSharedContext.java | 15 ++ .../colocated/GridDhtColocatedLockFuture.java | 8 +- .../distributed/near/GridNearLockFuture.java | 8 +- .../near/GridNearOptimisticTxPrepareFuture.java | 7 +- .../GridNearPessimisticTxPrepareFuture.java | 7 +- .../ignite/internal/util/IgniteUtils.java | 9 +- .../IgniteCachePutRetryAbstractSelfTest.java | 1 + ...gniteCachePutRetryTransactionalSelfTest.java | 179 +++++++++++++++++++ 10 files changed, 263 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java index d28c409..61bc367 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java @@ -18,6 +18,7 @@ package org.apache.ignite.cluster; import org.apache.ignite.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; /** @@ -27,6 +28,9 @@ public class ClusterTopologyException extends IgniteException { /** */ private static final long serialVersionUID = 0L; + /** Retry ready future. */ + private transient IgniteFuture<?> readyFut; + /** * Creates new topology exception with given error message. * @@ -46,4 +50,18 @@ public class ClusterTopologyException extends IgniteException { public ClusterTopologyException(String msg, @Nullable Throwable cause) { super(msg, cause); } + + /** + * @return Retry ready future. + */ + public IgniteFuture<?> retryReadyFuture() { + return readyFut; + } + + /** + * @param readyFut Retry ready future. + */ + public void retryReadyFuture(IgniteFuture<?> readyFut) { + this.readyFut = readyFut; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java index 8f985b4..2d7b0de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.cluster; import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.jetbrains.annotations.*; /** @@ -27,6 +28,9 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException { /** */ private static final long serialVersionUID = 0L; + /** Next topology version to wait. */ + private transient IgniteInternalFuture<?> readyFut; + /** * Creates new topology exception with given error message. * @@ -46,4 +50,18 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException { public ClusterTopologyCheckedException(String msg, @Nullable Throwable cause) { super(msg, cause); } + + /** + * @return Retry ready future. + */ + public IgniteInternalFuture<?> retryReadyFuture() { + return readyFut; + } + + /** + * @param readyFut Retry ready future. + */ + public void retryReadyFuture(IgniteInternalFuture<?> readyFut) { + this.readyFut = readyFut; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 7f4daff..f7763ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -444,6 +444,21 @@ public class GridCacheSharedContext<K, V> { } /** + * Gets ready future for the next affinity topology version (used in cases when a node leaves grid). + * + * @param curVer Current topology version (before a node left grid). + * @return Ready future. + */ + public IgniteInternalFuture<?> nextAffinityReadyFuture(AffinityTopologyVersion curVer) { + if (curVer == null) + return null; + + AffinityTopologyVersion nextVer = new AffinityTopologyVersion(curVer.topologyVersion() + 1); + + return exchMgr.affinityReadyFuture(nextVer); + } + + /** * @param tx Transaction to check. * @param activeCacheIds Active cache IDs. * @param cacheCtx Cache context. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index c784948..90ca8df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -1125,8 +1125,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @return Topology exception with user-friendly message. */ private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) { - return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " + - "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); + ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " + + "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); + + topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get())); + + return topEx; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index b7e0d73..2815194 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1232,8 +1232,12 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * @return Topology exception with user-friendly message. */ private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) { - return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " + - "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); + ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " + + "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); + + topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get())); + + return topEx; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 2b86672..4bb4c67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -100,7 +100,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd MiniFuture f = (MiniFuture) fut; if (f.node().id().equals(nodeId)) { - f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + + nodeId); + + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + + f.onResult(e); found = true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 7006114..3d43797 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -72,7 +72,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA MiniFuture f = (MiniFuture)fut; if (f.node().id().equals(nodeId)) { - f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + + nodeId); + + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + + f.onNodeLeft(e); found = true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 3366256..e259084 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.mxbean.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.transactions.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.io.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.lang.*; @@ -580,7 +581,13 @@ public abstract class IgniteUtils { m.put(ClusterTopologyCheckedException.class, new C1<IgniteCheckedException, IgniteException>() { @Override public IgniteException apply(IgniteCheckedException e) { - return new ClusterTopologyException(e.getMessage(), e); + ClusterTopologyException topEx = new ClusterTopologyException(e.getMessage(), e); + + ClusterTopologyCheckedException checked = (ClusterTopologyCheckedException)e; + + topEx.retryReadyFuture(new IgniteFutureImpl<>(checked.retryReadyFuture())); + + return topEx; } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index 9abc5c8..6624f8e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -54,6 +54,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr cfg.setAtomicWriteOrderMode(writeOrderMode()); cfg.setBackups(1); + cfg.setRebalanceMode(CacheRebalanceMode.SYNC); return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 91c454a..9a6bb31 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -19,17 +19,29 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; +import org.apache.ignite.transactions.*; +import javax.cache.*; +import javax.cache.processor.*; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + /** * */ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetryAbstractSelfTest { + /** */ + private static final int FACTOR = 1000; + /** {@inheritDoc} */ @Override protected CacheAtomicityMode atomicityMode() { return CacheAtomicityMode.TRANSACTIONAL; @@ -71,4 +83,171 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr finished.set(true); fut.get(); } + + /** {@inheritDoc} */ + public void testExplicitTransactionRetries() throws Exception { + final AtomicInteger idx = new AtomicInteger(); + int threads = 8; + + final AtomicReferenceArray<Exception> err = new AtomicReferenceArray<>(threads); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override + public Object call() throws Exception { + int th = idx.getAndIncrement(); + int base = th * FACTOR; + + Ignite ignite = ignite(0); + final IgniteCache<Object, Object> cache = ignite.cache(null); + + try { + for (int i = 0; i < FACTOR; i++) { + doInTransaction(ignite, new ProcessCallable(cache, base, i)); + + if (i > 0 && i % 500 == 0) + info("Done: " + i); + } + } + catch (Exception e) { + err.set(th, e); + } + + return null; + } + }, threads, "tx-runner"); + + while (!fut.isDone()) { + int stopIdx = ThreadLocalRandom.current().nextInt(2, 4); // Random in [2, 3]. + + stopGrid(stopIdx); + + startGrid(stopIdx); + } + + for (int i = 0; i < threads; i++) { + Exception error = err.get(i); + + if (error != null) + throw error; + } + + // Verify contents of the cache. + for (int g = 0; g < gridCount(); g++) { + IgniteCache<Object, Object> cache = ignite(g).cache(null); + + for (int th = 0; th < threads; th++) { + int base = th * FACTOR; + + String key = "key-" + base; + + Set<String> set = (Set<String>)cache.get(key); + + assertNotNull("Missing set for key: " + key, set); + assertEquals(FACTOR, set.size()); + + for (int i = 0; i < FACTOR; i++) { + assertEquals("value-" + i, cache.get("key-" + base + "-" + i)); + assertTrue(set.contains("value-" + i)); + } + } + } + } + + /** + * @param ignite Ignite instance. + * @param clo Closure. + * @return Result of closure execution. + * @throws Exception + */ + private <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception { + while (true) { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + T res = clo.call(); + + tx.commit(); + + return res; + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) { + ClusterTopologyException topEx = (ClusterTopologyException)e.getCause(); + + topEx.retryReadyFuture().get(); + } + else + throw e; + } + catch (ClusterTopologyException e) { + IgniteFuture<?> fut = e.retryReadyFuture(); + + fut.get(); + } + catch (TransactionRollbackException ignore) { + // Safe to retry right away. + } + } + } + + /** + * Callable to process inside transaction. + */ + private static class ProcessCallable implements Callable<Void> { + /** */ + private IgniteCache cache; + + /** */ + private int base; + + /** */ + private int i; + + /** + * @param cache Cache. + * @param base Base index. + * @param i Iteration index. + */ + private ProcessCallable(IgniteCache<Object, Object> cache, int base, int i) { + this.cache = cache; + this.base = base; + this.i = i; + } + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + ((IgniteCache<String, String>)cache).put("key-" + base + "-" + i, "value-" + i); + + ((IgniteCache<String, Set<String>>)cache).invoke("key-" + base, new AddEntryProcessor("value-" + i)); + + return null; + } + } + + /** + * + */ + private static class AddEntryProcessor implements CacheEntryProcessor<String, Set<String>, Void> { + /** */ + private String addVal; + + /** + * @param addVal Value to add. + */ + private AddEntryProcessor(String addVal) { + this.addVal = addVal; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<String, Set<String>> entry, Object... arguments) throws EntryProcessorException { + Set<String> set = entry.getValue(); + + if (set == null) + set = new HashSet<>(); + + set.add(addVal); + + entry.setValue(set); + + return null; + } + } }