Repository: incubator-ignite Updated Branches: refs/heads/ignite-136 [created] c3b7db8ea
IGNITE-136 Entry remove from local store when partitioned evicted and entries putted to local when partitioned exchange occurred. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c3b7db8e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c3b7db8e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c3b7db8e Branch: refs/heads/ignite-136 Commit: c3b7db8eacb778d6ab303265941f59cf5094115a Parents: e86c69e Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Fri Feb 20 13:05:16 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Tue Feb 24 10:31:35 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 9 + .../distributed/dht/GridDhtLocalPartition.java | 3 + .../GridCacheAbstractLocalStoreSelfTest.java | 314 +++++++++++++++++++ .../GridCachePartitionedLocalStoreSelfTest.java | 51 +++ .../GridCacheReplicatedLocalStoreSelfTest.java | 51 +++ ...ridCacheTxPartitionedLocalStoreSelfTest.java | 51 +++ .../ignite/testsuites/IgniteCacheTestSuite.java | 3 + 7 files changed, 482 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3b7db8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 18fce53..c9e95d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3130,6 +3130,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> cctx.dataStructures().onEntryUpdated(key, false); } + if (cctx.store().isLocalStore()) { + if (val != null || valBytes != null) { + if (val == null) + val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); + + cctx.store().putToStore(null, key, val, ver); + } + } + return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3b7db8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index af63307..f8af7dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -538,6 +538,9 @@ public class GridDhtLocalPartition<K, V> implements Comparable<GridDhtLocalParti if (cached.clearInternal(clearVer, swap)) { it.remove(); + if (cctx.store().isLocalStore()) + cctx.store().removeFromStore(null, cached.key()); + if (!cached.isInternal()) { mapPubSize.decrement(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3b7db8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java new file mode 100644 index 0000000..ef33797 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.processors.cache.store.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CachePreloadMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public abstract class GridCacheAbstractLocalStoreSelfTest extends GridCommonAbstractTest { + /** */ + public static final TestLocalStore<Integer, Integer> LOCAL_STORE_1 = new TestLocalStore<>(); + + /** */ + public static final TestLocalStore<Integer, Integer> LOCAL_STORE_2 = new TestLocalStore<>(); + + /** */ + public static final TestLocalStore<Integer, Integer> LOCAL_STORE_3 = new TestLocalStore<>(); + + /** */ + public static final int KEYS = 1000; + + /** */ + public static final String BACKUP_CACHE = "backup"; + + /** + * + */ + public GridCacheAbstractLocalStoreSelfTest() { + super(false /* doesn't start grid */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = cache(gridName, null, 0); + + CacheConfiguration cacheBackupCfg = cache(gridName, BACKUP_CACHE, 2); + + cfg.setCacheConfiguration(cacheCfg, cacheBackupCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + private CacheConfiguration cache(String gridName, String cacheName, int backups) { + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setName(cacheName); + cacheCfg.setCacheMode(getCacheMode()); + cacheCfg.setAtomicityMode(getAtomicMode()); + cacheCfg.setDistributionMode(getDisrtMode()); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setPreloadMode(SYNC); + + if (gridName.endsWith("1")) + cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<CacheStore>(LOCAL_STORE_1)); + else if (gridName.endsWith("2")) + cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<CacheStore>(LOCAL_STORE_2)); + else + cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<CacheStore>(LOCAL_STORE_3)); + + cacheCfg.setWriteThrough(true); + cacheCfg.setReadThrough(true); + cacheCfg.setWriteBehindBatchSize(1); + cacheCfg.setWriteBehindFlushFrequency(1); + cacheCfg.setWriteBehindFlushSize(1); + cacheCfg.setBackups(backups); + return cacheCfg; + } + + /** + * @return Distribution mode. + */ + protected abstract CacheDistributionMode getDisrtMode(); + + /** + * @return Cache atomicity mode. + */ + protected abstract CacheAtomicityMode getAtomicMode(); + + /** + * @return Cache mode. + */ + protected abstract CacheMode getCacheMode(); + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNode() throws Exception { + Ignite ignite1 = startGrid(1); + + IgniteCache<Object, Object> cache = ignite1.jcache(null); + + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + for (int i = 0; i < KEYS; i++) + assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i); + + final CountDownLatch startPartExchange = new CountDownLatch(1); + final AtomicBoolean eventOcr = new AtomicBoolean(true); + + if (getCacheMode() != REPLICATED) { + ignite1.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event event) { + startPartExchange.countDown(); + eventOcr.set(true); + + return true; + } + }, EventType.EVT_CACHE_PRELOAD_OBJECT_UNLOADED); + } + + Ignite ignite2 = startGrid(2); + + assertEquals(Ignition.allGrids().size(), 2); + + // Wait when partition unloaded. + waitExpirePartition(startPartExchange, eventOcr); + + checkLocalStore(ignite1, LOCAL_STORE_1); + checkLocalStore(ignite2, LOCAL_STORE_2); + } + + /** + * Wait when partition unloaded. + */ + private void waitExpirePartition(CountDownLatch startPartExchange, AtomicBoolean eventOcr) throws Exception { + if (getCacheMode() != REPLICATED) { + assert startPartExchange.await(1, TimeUnit.SECONDS); + + while (true) { + if (eventOcr.get()) { + eventOcr.set(false); + + TimeUnit.MILLISECONDS.sleep(100); + } + else + break; + } + } + } + + public void testBackupNode() throws Exception { + Ignite ignite1 = startGrid(1); + + IgniteCache<Object, Object> cache = ignite1.jcache(BACKUP_CACHE); + + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + for (int i = 0; i < KEYS; i++) + assertEquals(LOCAL_STORE_1.load(i).get1().intValue(), i); + + // Start 2'nd node. + Ignite ignite2 = startGrid(2); + + assertEquals(2, Ignition.allGrids().size()); + + checkLocalStoreForBackup(ignite2, LOCAL_STORE_2); + + // Start 3'nd node. + Ignite ignite3 = startGrid(3); + + assertEquals(Ignition.allGrids().size(), 3); + + for (int i = 0; i < KEYS; i++) + cache.put(i, i * 3); + + checkLocalStoreForBackup(ignite2, LOCAL_STORE_2); + checkLocalStoreForBackup(ignite3, LOCAL_STORE_3); + + // Stop 3'nd node. + stopGrid(3, true); + + assertEquals(Ignition.allGrids().size(), 2); + + checkLocalStoreForBackup(ignite2, LOCAL_STORE_2); + } + + /** + * Check that local stores contains only primary entry. + */ + private void checkLocalStore(Ignite ignite, CacheStore<Integer, IgniteBiTuple<Integer, ?>> store) { + for (int i = 0; i < KEYS; i++) { + if (ignite.affinity(null).isPrimary(ignite.cluster().localNode(), i)) + assertEquals(store.load(i).get1().intValue(), i); + else if (!ignite.affinity(null).isPrimaryOrBackup(ignite.cluster().localNode(), i)) + assertNull(store.load(i)); + } + } + + /** + * Check that local stores contains only primary entry. + */ + private void checkLocalStoreForBackup(Ignite ignite, CacheStore<Integer, IgniteBiTuple<Integer, ?>> store) { + for (int i = 0; i < KEYS; i++) { + if (ignite.affinity(BACKUP_CACHE).isBackup(ignite.cluster().localNode(), i)) + assertEquals(store.load(i).get1().intValue(), i); + else if (!ignite.affinity(BACKUP_CACHE).isPrimaryOrBackup(ignite.cluster().localNode(), i)) + assertNull(store.load(i).get1()); + } + } + + /** + * + */ + @CacheLocalStore + public static class TestLocalStore<K, V> implements CacheStore<K, IgniteBiTuple<V, ?>> { + /** */ + private Map<K, IgniteBiTuple<V, ?>> map = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<K, IgniteBiTuple<V, ?>> clo, @Nullable Object... args) + throws CacheLoaderException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteBiTuple<V, ?> load(K key) throws CacheLoaderException { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public Map<K, IgniteBiTuple<V, ?>> loadAll(Iterable<? extends K> keys) throws CacheLoaderException { + Map<K, IgniteBiTuple<V, ?>> res = new HashMap<>(); + + for (K key : keys) { + IgniteBiTuple<V, ?> val = map.get(key); + + if (val != null) + res.put(key, val); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends K, ? extends IgniteBiTuple<V, ?>> entry) + throws CacheWriterException { + map.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends IgniteBiTuple<V, ?>>> entries) + throws CacheWriterException { + for (Cache.Entry<? extends K, ? extends IgniteBiTuple<V, ?>> e : entries) + map.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + map.remove(key); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { + for (Object key : keys) + map.remove(key); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3b7db8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java new file mode 100644 index 0000000..d2dfcf0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedLocalStoreSelfTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCachePartitionedLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest { + /** + * + */ + public GridCachePartitionedLocalStoreSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDisrtMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3b7db8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java new file mode 100644 index 0000000..2d43d13 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedLocalStoreSelfTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCacheReplicatedLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest { + /** + * + */ + public GridCacheReplicatedLocalStoreSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDisrtMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3b7db8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java new file mode 100644 index 0000000..e031102 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTxPartitionedLocalStoreSelfTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCacheTxPartitionedLocalStoreSelfTest extends GridCacheAbstractLocalStoreSelfTest { + /** + * + */ + public GridCacheTxPartitionedLocalStoreSelfTest() { + super(); + } + + /** {@inheritDoc} */ + @Override protected CacheDistributionMode getDisrtMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode getAtomicMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected CacheMode getCacheMode() { + return PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3b7db8e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index df9ffab..b2f6137 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -138,6 +138,9 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheOffHeapTieredAtomicSelfTest.class); suite.addTestSuite(GridCacheOffHeapTieredSelfTest.class); suite.addTestSuite(GridCacheGlobalLoadTest.class); + suite.addTestSuite(GridCachePartitionedLocalStoreSelfTest.class); + suite.addTestSuite(GridCacheReplicatedLocalStoreSelfTest.class); + //suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class); TODO GG-9762 // Heuristic exception handling. TODO IGNITE-257 // suite.addTestSuite(GridCacheColocatedTxExceptionSelfTest.class);