Repository: incubator-ignite Updated Branches: refs/heads/ignite-237 5a76d70f9 -> 9efcd8035
IGNITE-136 Clear local store for entry from swap. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f33c0748 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f33c0748 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f33c0748 Branch: refs/heads/ignite-237 Commit: f33c07486ff932672b838604af3acb67397f34fc Parents: 6ba6090 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Wed Feb 25 13:37:33 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Wed Feb 25 13:44:46 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLocalPartition.java | 9 +- .../GridCacheAbstractLocalStoreSelfTest.java | 118 ++++++++++++++----- .../GridCachePartitionedLocalStoreSelfTest.java | 2 +- ...chePartitionedOffHeapLocalStoreSelfTest.java | 56 +++++++++ .../GridCacheReplicatedLocalStoreSelfTest.java | 2 +- ...ridCacheTxPartitionedLocalStoreSelfTest.java | 2 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 3 +- 7 files changed, 157 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 93a6012..e3efade 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -488,6 +488,8 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti try { GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry<V>>> it = cctx.swap().iterator(id, false); + boolean isLocStore = cctx.store().isLocalStore(); + if (it != null) { // We can safely remove these values because no entries will be created for evicted partition. while (it.hasNext()) { @@ -498,6 +500,9 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti K key = cctx.marshaller().unmarshal(keyBytes, cctx.deploy().globalLoader()); cctx.swap().remove(key, keyBytes); + + if (isLocStore) + cctx.store().removeFromStore(null, key); } } } @@ -531,7 +536,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti boolean rec = cctx.events().isRecordable(EVT_CACHE_PRELOAD_OBJECT_UNLOADED); - boolean locStore = cctx.store().isLocalStore(); + boolean isLocStore = cctx.store().isLocalStore(); for (Iterator<GridDhtCacheEntry<K, V>> it = map.values().iterator(); it.hasNext();) { GridDhtCacheEntry<K, V> cached = it.next(); @@ -540,7 +545,7 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti if (cached.clearInternal(clearVer, swap)) { it.remove(); - if (locStore) + if (isLocStore) cctx.store().removeFromStore(null, cached.key()); if (!cached.isInternal()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java index ed1b889..9bf9bc4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import com.google.common.collect.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; @@ -37,6 +38,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.cache.CacheMemoryMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CachePreloadMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; @@ -100,13 +102,16 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst LOCAL_STORE_3.clear(); } + /** + * + */ private CacheConfiguration cache(String gridName, String cacheName, int backups) { CacheConfiguration cacheCfg = new CacheConfiguration(); cacheCfg.setName(cacheName); cacheCfg.setCacheMode(getCacheMode()); cacheCfg.setAtomicityMode(getAtomicMode()); - cacheCfg.setDistributionMode(getDisrtMode()); + cacheCfg.setDistributionMode(getDistributionMode()); cacheCfg.setWriteSynchronizationMode(FULL_SYNC); cacheCfg.setPreloadMode(SYNC); @@ -120,6 +125,11 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst cacheCfg.setWriteThrough(true); cacheCfg.setReadThrough(true); cacheCfg.setBackups(backups); + cacheCfg.setOffHeapMaxMemory(0); + cacheCfg.setSwapEnabled(true); + + if (isOffHeapTiredMode()) + cacheCfg.setMemoryMode(OFFHEAP_TIERED); return cacheCfg; } @@ -127,7 +137,7 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst /** * @return Distribution mode. */ - protected abstract CacheDistributionMode getDisrtMode(); + protected abstract CacheDistributionMode getDistributionMode(); /** * @return Cache atomicity mode. @@ -139,6 +149,13 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst */ protected abstract CacheMode getCacheMode(); + /** + * @return Cache memory mode. + */ + protected boolean isOffHeapTiredMode() { + return false; + } + /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { stopAllGrids(); @@ -152,57 +169,47 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst IgniteCache<Object, Object> cache = ignite1.jcache(null); + // Populate cache and check that local store has all value. for (int i = 0; i < KEYS; i++) cache.put(i, i); - for (int i = 0; i < KEYS; i++) - assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i); + checkLocalStore(ignite1, LOCAL_STORE_1); - final CountDownLatch startPartExchange = new CountDownLatch(1); - final AtomicBoolean eventOcr = new AtomicBoolean(true); + final CountDownLatch partExchanged = new CountDownLatch(1); + + final int[] leftPartition = new int[1]; if (getCacheMode() != REPLICATED) { ignite1.events().localListen(new IgnitePredicate<Event>() { + private AtomicInteger eventCnt = new AtomicInteger(0); + @Override public boolean apply(Event event) { - startPartExchange.countDown(); - - eventOcr.set(true); - + if (leftPartition[0] - eventCnt.incrementAndGet() == 0) + partExchanged.countDown(); + return true; } - }, EventType.EVT_CACHE_PRELOAD_OBJECT_UNLOADED); + }, EventType.EVT_CACHE_PRELOAD_PART_UNLOADED); } Ignite ignite2 = startGrid(2); + // Partition count which must be transferred to 2'nd node. + leftPartition[0] = ignite2.affinity(null).allPartitions(ignite2.cluster().localNode()).length; + assertEquals(Ignition.allGrids().size(), 2); // Wait when partition unloaded. - waitExpirePartition(startPartExchange, eventOcr); + if (getCacheMode() != REPLICATED) + assert partExchanged.await(2, TimeUnit.SECONDS); checkLocalStore(ignite1, LOCAL_STORE_1); checkLocalStore(ignite2, LOCAL_STORE_2); } /** - * Wait when partition unloaded. + * @throws Exception If failed. */ - private void waitExpirePartition(CountDownLatch startPartExchange, AtomicBoolean eventOcr) throws Exception { - if (getCacheMode() != REPLICATED) { - assert startPartExchange.await(2, TimeUnit.SECONDS); - - while (true) { - if (eventOcr.get()) { - eventOcr.set(false); - - TimeUnit.MILLISECONDS.sleep(100); - } - else - break; - } - } - } - public void testBackupNode() throws Exception { Ignite ignite1 = startGrid(1); @@ -241,6 +248,59 @@ public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbst } /** + * @throws Exception If failed. + */ + public void testSwap() throws Exception { + Ignite ignite1 = startGrid(1); + + IgniteCache<Object, Object> cache = ignite1.jcache(null); + + // Populate cache and check that local store has all value. + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + checkLocalStore(ignite1, LOCAL_STORE_1); + + // Push entry to swap. + for (int i = 0; i < KEYS; i++) + cache.localEvict(Lists.newArrayList(i)); + + for (int i = 0; i < KEYS; i++) + assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); + + final CountDownLatch partExchanged = new CountDownLatch(1); + + final int[] leftPartition = new int[1]; + + if (getCacheMode() != REPLICATED) { + ignite1.events().localListen(new IgnitePredicate<Event>() { + private AtomicInteger eventCnt = new AtomicInteger(0); + + @Override public boolean apply(Event event) { + if (leftPartition[0] - eventCnt.incrementAndGet() == 0) + partExchanged.countDown(); + + return true; + } + }, EventType.EVT_CACHE_PRELOAD_PART_UNLOADED); + } + + Ignite ignite2 = startGrid(2); + + // Partition count which must be transferred to 2'nd node. + leftPartition[0] = ignite2.affinity(null).allPartitions(ignite2.cluster().localNode()).length; + + assertEquals(Ignition.allGrids().size(), 2); + + // Wait when partition unloaded. + if (getCacheMode() != REPLICATED) + assert partExchanged.await(2, TimeUnit.SECONDS); + + checkLocalStore(ignite1, LOCAL_STORE_1); + checkLocalStore(ignite2, LOCAL_STORE_2); + } + + /** * Check that local stores contains only primary entry. */ private void checkLocalStore(Ignite ignite, CacheStore<Integer, IgniteBiTuple<Integer, ?>> store) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java index d2dfcf0..4217531 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java @@ -35,7 +35,7 @@ public class GridCachePartitionedLocalStoreSelfTest extends GridCacheAbstractLoc } /** {@inheritDoc} */ - @Override protected CacheDistributionMode getDisrtMode() { + @Override protected CacheDistributionMode getDistributionMode() { return PARTITIONED_ONLY; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java new file mode 100644 index 0000000..6dfc977 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedOffHeapLocalStoreSelfTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCachePartitionedOffHeapLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest { + /** + * + */ + public GridCachePartitionedOffHeapLocalStoreSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDistributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected boolean isOffHeapTiredMode() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java index 2d43d13..56f3f1a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java @@ -35,7 +35,7 @@ public class GridCacheReplicatedLocalStoreSelfTest extends GridCacheAbstractLoca } /** {@inheritDoc} */ - @Override protected CacheDistributionMode getDisrtMode() { + @Override protected CacheDistributionMode getDistributionMode() { return PARTITIONED_ONLY; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java index e031102..113bac3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java @@ -35,7 +35,7 @@ public class GridCacheTxPartitionedLocalStoreSelfTest extends GridCacheAbstractL } /** {@inheritDoc} */ - @Override protected CacheDistributionMode getDisrtMode() { + @Override protected CacheDistributionMode getDistributionMode() { return PARTITIONED_ONLY; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f33c0748/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 28c9b57..6b3eb4a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -140,7 +140,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheGlobalLoadTest.class); suite.addTestSuite(GridCachePartitionedLocalStoreSelfTest.class); suite.addTestSuite(GridCacheReplicatedLocalStoreSelfTest.class); - //suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class); TODO GG-9762 + suite.addTestSuite(GridCachePartitionedOffHeapLocalStoreSelfTest.class); + suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class); // Heuristic exception handling. TODO IGNITE-257 // suite.addTestSuite(GridCacheColocatedTxExceptionSelfTest.class);