ignite-500 Cache load works incorrectly
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/acf86cf1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/acf86cf1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/acf86cf1 Branch: refs/heads/ignite-500 Commit: acf86cf16f738ab644e9387b98fb26e00f67bdf2 Parents: 18b4c39 Author: agura <ag...@gridgain.com> Authored: Fri Apr 24 20:58:01 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Wed Apr 29 15:31:37 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 59 ++++++- .../distributed/dht/GridDhtCacheAdapter.java | 6 +- .../datastreamer/DataStreamerImpl.java | 75 ++++++++- .../datastreamer/DataStreamerUpdateJob.java | 4 + ...GridCacheLoadingConcurrentGridStartTest.java | 163 ++++++++++++++++--- .../ignite/testsuites/IgniteCacheTestSuite.java | 2 +- 6 files changed, 276 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/acf86cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3f4e97b..6d51c70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3312,6 +3312,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ttl == CU.TTL_ZERO) return; + if (!topVer.equals(ctx.topology().topologyVersion())) + throw new ClusterTopologyException("Topology changed"); + loadEntry(key, val, ver0, (IgniteBiPredicate<Object, Object>) p, topVer, replicate, ttl); } }, args); @@ -3531,17 +3534,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { - ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()); - ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true); CacheOperationContext opCtx = ctx.operationContextPerCall(); ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null; - return ctx.kernalContext().closure().callAsync(BROADCAST, - Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)), - nodes.nodes()); + final LoadCacheClosure<K, V> loadClos = new LoadCacheClosure<>(ctx.name(), p, args, plc); + + return new GlobalLoadCacheFuture(loadClos, ctx); } /** @@ -5697,4 +5698,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V metrics.addPutAndGetTimeNanos(System.nanoTime() - start); } } + + /** + * Global load cache future. + */ + private static class GlobalLoadCacheFuture<K1, V1> extends GridFutureAdapter<Void> { + /** Load clos. */ + private final LoadCacheClosure<K1, V1> loadClos; + + /** Context. */ + private final GridCacheContext<K1, V1> ctx; + + /** + * @param loadClos Load cache closure. + * @param ctx Context. + */ + public GlobalLoadCacheFuture(LoadCacheClosure<K1, V1> loadClos, GridCacheContext<K1, V1> ctx) { + this.loadClos = loadClos; + this.ctx = ctx; + + init(); + } + + /** + * Inits future. + */ + private void init() { + ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()); + + ComputeTaskInternalFuture<Collection<Void>> loadFut = ctx.kernalContext().closure().callAsync(BROADCAST, + Arrays.asList(loadClos), nodes.nodes()); + + loadFut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Void>>>() { + @Override public void apply(IgniteInternalFuture<Collection<Void>> fut) { + try { + fut.get(); + + onDone(); + } + catch (Exception e) { + if (X.hasCause(e, ClusterTopologyException.class)) + init(); + else + onDone(null, fut.error()); + } + } + }); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/acf86cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 1c46fd0..b41538f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -410,10 +410,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public void apply(KeyCacheObject key, Object val, @Nullable GridCacheVersion ver) { assert ver == null; + if (!topVer.equals(topology().topologyVersion())) + throw new ClusterTopologyException("Topology changed"); + loadEntry(key, val, ver0, p, topVer, replicate, plc); } }, args); - } finally { if (p instanceof GridLoadCacheCloseablePredicate) @@ -476,7 +478,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } } else if (log.isDebugEnabled()) - log.debug("Will node load entry into cache (partition is invalid): " + part); + log.debug("Will node load entry into cache (partition is invalid): " + part); } catch (GridDhtInvalidPartitionException e) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/acf86cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index a69e033..5b37273 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.cacheobject.*; import org.apache.ignite.internal.processors.dr.*; @@ -40,6 +39,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.stream.*; + import org.jetbrains.annotations.*; import org.jsr166.*; @@ -59,6 +59,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; */ @SuppressWarnings("unchecked") public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed { + /** Debug map. */ + public static final ConcurrentMap<Integer, Collection<DebugInfo>> DEBUG_MAP = new ConcurrentHashMap<>(); + /** Isolated receiver. */ private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater(); @@ -1419,9 +1422,27 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed topVer, GridDrType.DR_LOAD); + Integer key = entry.key().value(null, false); + + Collection<DebugInfo> debugInfos = DEBUG_MAP.get(key); + + if (debugInfos == null) { + Collection<DebugInfo> oldDebugInfo = + DEBUG_MAP.putIfAbsent(key, debugInfos = new ConcurrentLinkedQueue()); + + if (oldDebugInfo != null) + debugInfos = oldDebugInfo; + } + + DebugInfo debugInfo = new DebugInfo(topVer, cctx.nodeId(), cctx.cache().affinity().partition(key), + cctx.cache().affinity().isPrimary(cctx.localNode(), key), + cctx.cache().affinity().isBackup(cctx.localNode(), key)); + + debugInfos.add(debugInfo); + cctx.evicts().touch(entry, topVer); } - catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) { + catch (GridCacheEntryRemovedException ignored) { // No-op. } catch (IgniteCheckedException ex) { @@ -1432,4 +1453,54 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } } } + + /** + * Debug info. + */ + public static class DebugInfo { + /** Timestamp. */ + public long ts = U.currentTimeMillis(); + + /** Topology version. */ + public AffinityTopologyVersion topVer; + + /** Node id. */ + public UUID nodeId; + + /** Partition. */ + public int part; + + /** Primary. */ + public boolean primary; + + /** Backup. */ + public boolean backup; + + /** + * @param topVer Topology version. + * @param nodeId Node id. + * @param part Partition. + * @param primary Primary. + * @param backup Backup. + */ + public DebugInfo(AffinityTopologyVersion topVer, UUID nodeId, int part, boolean primary, boolean backup) { + this.topVer = topVer; + this.nodeId = nodeId; + this.part = part; + this.primary = primary; + this.backup = backup; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "DebugInfo{" + + "ts=" + ts + + ", topVer=" + topVer + + ", nodeId=" + nodeId + + ", part=" + part + + ", primary=" + primary + + ", backup=" + backup + + '}'; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/acf86cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java index 21ba3ac..e865aa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.datastreamer; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.stream.*; @@ -124,6 +125,9 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { return null; } + catch (GridDhtInvalidPartitionException e) { + throw new IgniteCheckedException(e); + } finally { if (ignoreDepOwnership) cache.context().deploy().ignoreOwnership(false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/acf86cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java index 2f9bb96..36cf0bd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLoadingConcurrentGridStartTest.java @@ -18,61 +18,70 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.datastreamer.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; + import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.configuration.*; import javax.cache.integration.*; +import java.io.*; import java.util.concurrent.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; /** * Tests for cache data loading during simultaneous grids start. */ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** Grids count */ - private static int GRIDS_CNT = 5; + private static int GRIDS_CNT = 10; /** Keys count */ - private static int KEYS_CNT = 1_000_000; + private static int KEYS_CNT = 100000; /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setCacheMode(PARTITIONED); - ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); - CacheStore<Integer, String> store = new CacheStoreAdapter<Integer, String>() { - @Override public void loadCache(IgniteBiInClosure<Integer, String> f, Object... args) { - for (int i = 0; i < KEYS_CNT; i++) - f.apply(i, Integer.toString(i)); - } + ccfg.setRebalanceMode(SYNC); - @Nullable @Override public String load(Integer i) throws CacheLoaderException { - return null; - } + ccfg.setBackups(1); - @Override public void write(Cache.Entry<? extends Integer, ? extends String> entry) throws CacheWriterException { - // No-op. - } + ccfg.setNearConfiguration(null); - @Override public void delete(Object o) throws CacheWriterException { - // No-op. - } - }; + CacheStore<Integer, String> store = new TestCacheStoreAdapter(); ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); @@ -92,9 +101,22 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT public void testLoadCacheWithDataStreamer() throws Exception { IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() { @Override public void apply(Ignite grid) { + try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) { - for (int i = 0; i < KEYS_CNT; i++) + dataStreamer.perNodeBufferSize(1024); + + for (int i = 0; i < KEYS_CNT; i++) { dataStreamer.addData(i, Integer.toString(i)); + + if (i % 100 == 0) { + try { + U.sleep(5); + } + catch (IgniteInterruptedCheckedException e) { + e.printStackTrace(); + } + } + } } } }; @@ -124,7 +146,13 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Ignite>() { @Override public Ignite call() throws Exception { - return startGridsMultiThreaded(1, GRIDS_CNT - 1); + for (int i = 1; i < GRIDS_CNT - 1; i++) { + startGrid(i); + + U.sleep(50); + } + + return startGridsMultiThreaded(GRIDS_CNT - 1, 1); } }); @@ -135,6 +163,45 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT fut.get(); } + IgniteCache<Integer, String> cache = g0.cache(null); + + int missingCnt = 0; + + for (int i = 0; i < KEYS_CNT; i++) { + if (cache.get(i) == null) { + missingCnt++; + + System.out.println("----------------------------------------------------------------------------"); + System.out.println("!!! Lost key: " + i); + System.out.println("----------------------------------------------------------------------------"); + + for (DataStreamerImpl.DebugInfo debugInfo : DataStreamerImpl.DEBUG_MAP.get(i)) { + if (debugInfo == null) + continue; + + System.out.println(debugInfo); + + GridCacheContext cctx = ((IgniteCacheProxy)G.ignite(debugInfo.nodeId).cache(null)).context(); + + GridDhtPartitionTopology top = cctx.topology(); + + GridDhtLocalPartition histPart = + top.localPartition(debugInfo.part, debugInfo.topVer, false); + + GridDhtLocalPartition curPart = + top.localPartition(debugInfo.part, cctx.affinity().affinityTopologyVersion(), false); + + int expPart = cctx.affinity().partition(i); + + System.out.println("Checking: Part state was: " + (histPart == null ? null : histPart.state()) + ", " + + "Part state now: " + (curPart == null ? null : curPart.state()) + ", " + + "Current part: " + expPart); + } + } + } + + System.out.println("!!! Lost keys total: " + missingCnt); + assertCacheSize(); } @@ -142,13 +209,63 @@ public class GridCacheLoadingConcurrentGridStartTest extends GridCommonAbstractT private void assertCacheSize() { IgniteCache<Integer, String> cache = grid(0).cache(null); - assertEquals(KEYS_CNT, cache.size(CachePeekMode.PRIMARY)); + printStats(cache); + + assertEquals(KEYS_CNT, cache.size()); int total = 0; for (int i = 0; i < GRIDS_CNT; i++) - total += grid(i).cache(null).localSize(CachePeekMode.PRIMARY); + total += grid(i).cache(null).localSize(); assertEquals(KEYS_CNT, total); } + + /** + * @param cache Cache. + */ + private void printStats(IgniteCache<Integer, String> cache) { + System.out.println("!!! Cache size: " + cache.size()); + + System.out.println(); + + int total = 0; + + for (int i = 0; i < GRIDS_CNT; i++) { + int locSize = grid(i).cache(null).localSize(); + + System.out.println("!!! Local cache size(" + i + "): " + locSize); + + total += locSize; + } + + System.out.println("!!! Total cache size: " + total); + } + + /** + * Cache store adapter. + */ + private static class TestCacheStoreAdapter extends CacheStoreAdapter<Integer, String> implements Serializable { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, String> f, Object... args) { + for (int i = 0; i < KEYS_CNT; i++) + f.apply(i, Integer.toString(i)); + } + + /** {@inheritDoc} */ + @Nullable @Override public String load(Integer i) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends String> entry) + throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object o) throws CacheWriterException { + // No-op. + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/acf86cf1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 6e70052..6f954cd 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -224,7 +224,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTest(new TestSuite(GridCacheDhtPreloadUnloadSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedAffinityFilterSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedPreloadLifecycleSelfTest.class)); -// suite.addTest(new TestSuite(GridCacheLoadingConcurrentGridStartTest.class)); TODO-ignite-500 + suite.addTest(new TestSuite(GridCacheLoadingConcurrentGridStartTest.class)); suite.addTest(new TestSuite(GridCacheDhtPreloadDelayedSelfTest.class)); suite.addTest(new TestSuite(GridPartitionedBackupLoadSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedLoadCacheSelfTest.class));