IGNITE-621 - Fixing remap logic.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c2c90b52 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c2c90b52 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c2c90b52 Branch: refs/heads/ignite-950 Commit: c2c90b52972bec53919d97ec07d2aeab4d0d55e8 Parents: a9d0662 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Jun 25 17:06:31 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Jun 25 17:06:31 2015 -0700 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../processors/cache/GridCacheAdapter.java | 2 +- .../processors/cache/GridCacheAtomicFuture.java | 12 ++- .../processors/cache/GridCacheMvccManager.java | 8 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 12 ++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 88 ++++++++++++++++++-- .../communication/tcp/TcpCommunicationSpi.java | 2 +- ...eAtomicInvalidPartitionHandlingSelfTest.java | 7 +- 8 files changed, 110 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 542fa30..40fc873 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -343,6 +343,9 @@ public final class IgniteSystemProperties { /** Maximum size for affinity assignment history. */ public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE"; + /** Number of cache operation retries in case of topology exceptions. */ + public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index f993527..e138520 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -79,7 +79,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; /** Maximum number of retries when topology changes. */ - public static final int MAX_RETRIES = 100; + public static final int MAX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100); /** Deserialization stash. */ private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index 35d3ec5..8724d3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.affinity.*; import java.util.*; @@ -26,14 +27,17 @@ import java.util.*; */ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { /** - * @return {@code True} if partition exchange should wait for this future to complete. + * @return Future topology version. */ - public boolean waitForPartitionExchange(); + public AffinityTopologyVersion topologyVersion(); /** - * @return Future topology version. + * Gets future that will be completed when it is safe when update is finished on the given version of topology. + * + * @param topVer Topology version to finish. + * @return Future or {@code null} if no need to wait. */ - public AffinityTopologyVersion topologyVersion(); + public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer); /** * @return Future keys. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index c528e08..f24cf01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -338,7 +338,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) { IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut); - assert old == null; + assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; } /** @@ -1002,8 +1002,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class); for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) { - if (fut.waitForPartitionExchange() && fut.topologyVersion().compareTo(topVer) < 0) - res.add((IgniteInternalFuture<Object>)fut); + IgniteInternalFuture<Void> complete = fut.completeFuture(topVer); + + if (complete != null) + res.add((IgniteInternalFuture)complete); } res.markInitialized(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index ff8454e..37b57e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; @@ -170,13 +171,16 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } /** {@inheritDoc} */ - @Override public boolean waitForPartitionExchange() { - return waitForExchange; + @Override public AffinityTopologyVersion topologyVersion() { + return updateReq.topologyVersion(); } /** {@inheritDoc} */ - @Override public AffinityTopologyVersion topologyVersion() { - return updateReq.topologyVersion(); + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + if (waitForExchange && topologyVersion().compareTo(topVer) < 0) + return this; + + return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 536eb40..ea9b335 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -105,7 +105,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> private final ExpiryPolicy expiryPlc; /** Future map topology version. */ - private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + + /** Completion future for a particular topology version. */ + private GridFutureAdapter<Void> topCompleteFut; /** Optional filter. */ private final CacheEntryPredicate[] filter; @@ -246,8 +249,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull()); } - /** {@inheritDoc} */ - @Override public boolean waitForPartitionExchange() { + /** + * @return {@code True} if this future should block partition map exchange. + */ + private boolean waitForPartitionExchange() { // Wait fast-map near atomic update futures in CLOCK mode. return fastMap; } @@ -323,13 +328,36 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> else { topLocked = true; - // Cannot remap. - remapCnt.set(1); + synchronized (this) { + this.topVer = topVer; + + // Cannot remap. + remapCnt.set(1); + } map0(topVer, null, false, null); } } + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + if (waitForPartitionExchange() && topologyVersion().compareTo(topVer) < 0) { + synchronized (this) { + if (this.topVer == AffinityTopologyVersion.ZERO) + return null; + + if (this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); + + return topCompleteFut; + } + } + } + + return null; + } + /** * @param failed Keys to remap. */ @@ -339,14 +367,20 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> Collection<Object> remapKeys = new ArrayList<>(failed.size()); Collection<Object> remapVals = vals != null ? new ArrayList<>(failed.size()) : null; + Collection<GridCacheDrInfo> remapConflictPutVals = conflictPutVals != null ? new ArrayList<GridCacheDrInfo>(failed.size()) : null; + Collection<GridCacheVersion> remapConflictRmvVals = conflictRmvVals != null ? new ArrayList<GridCacheVersion>(failed.size()) : null; Iterator<?> keyIt = keys.iterator(); Iterator<?> valsIt = vals != null ? vals.iterator() : null; + Iterator<GridCacheDrInfo> conflictPutValsIt = conflictPutVals != null ? conflictPutVals.iterator() : null; + Iterator<GridCacheVersion> conflictRmvValsIt = conflictRmvVals != null ? conflictRmvVals.iterator() : null; for (Object key : failed) { while (keyIt.hasNext()) { Object nextKey = keyIt.next(); Object nextVal = valsIt != null ? valsIt.next() : null; + GridCacheDrInfo nextConflictPutVal = conflictPutValsIt != null ? conflictPutValsIt.next() : null; + GridCacheVersion nextConflictRmvVal = conflictRmvValsIt != null ? conflictRmvValsIt.next() : null; if (F.eq(key, nextKey)) { remapKeys.add(nextKey); @@ -354,6 +388,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (remapVals != null) remapVals.add(nextVal); + if (remapConflictPutVals != null) + remapConflictPutVals.add(nextConflictPutVal); + + if (remapConflictRmvVals != null) + remapConflictRmvVals.add(nextConflictRmvVal); + break; } } @@ -361,13 +401,29 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> keys = remapKeys; vals = remapVals; + conflictPutVals = remapConflictPutVals; + conflictRmvVals = remapConflictRmvVals; - mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); single = null; futVer = null; err = null; opRes = null; - topVer = AffinityTopologyVersion.ZERO; + + GridFutureAdapter<Void> fut0; + + synchronized (this) { + mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); + + topVer = AffinityTopologyVersion.ZERO; + + fut0 = topCompleteFut; + + topCompleteFut = null; + } + + if (fut0 != null) + fut0.onDone(); + singleNodeId = null; singleReq = null; fastMapRemap = false; @@ -405,6 +461,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (futVer != null) cctx.mvcc().removeAtomicFuture(version()); + GridFutureAdapter<Void> fut0; + + synchronized (this) { + fut0 = topCompleteFut; + } + + if (fut0 != null) + fut0.onDone(); + return true; } @@ -544,6 +609,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } + + synchronized (this) { + this.topVer = topVer; + } } finally { cache.topology().readUnlock(); @@ -559,7 +628,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> boolean remap = false; synchronized (this) { - if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) { + if (topVer != AffinityTopologyVersion.ZERO && + ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty())) { CachePartialUpdateCheckedException err0 = err; if (err0 != null) @@ -1040,7 +1110,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (err0 == null) err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - List<Object> keys = new ArrayList<>(failedKeys.size()); + Collection<Object> keys = new ArrayList<>(failedKeys.size()); for (KeyCacheObject key : failedKeys) keys.add(key.value(cctx.cacheObjectContext(), false)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index addf243d..4ca2995 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -210,7 +210,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final int DFLT_ACK_SND_THRESHOLD = 16; /** Default socket write timeout. */ - public static final long DFLT_SOCK_WRITE_TIMEOUT = 200; + public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000; /** No-op runnable. */ private static final IgniteRunnable NOOP = new IgniteRunnable() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java index 054a110..b255558 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java @@ -41,6 +41,7 @@ import org.jsr166.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; import static org.apache.ignite.cache.CacheMode.*; @@ -236,6 +237,8 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA System.err.println("FINISHED PUTS"); + GridCacheMapEntry.debug = true; + // Start put threads. IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { @@ -340,12 +343,12 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA } catch (AssertionError e) { if (r == 9) { - System.err.println("Failed to verify cache contents: " + e.getMessage()); + info("Failed to verify cache contents: " + e.getMessage()); throw e; } - System.err.println("Failed to verify cache contents, will retry: " + e.getMessage()); + info("Failed to verify cache contents, will retry: " + e.getMessage()); // Give some time to finish async updates. U.sleep(1000);