# ignite-63
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a916db69 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a916db69 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a916db69 Branch: refs/heads/ignite-63 Commit: a916db69a5d8eaf3b58cd09cbd9f394eed494899 Parents: 537f631 Author: sboikov <sboi...@gridgain.com> Authored: Fri Jan 23 12:28:28 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jan 23 12:28:28 2015 +0300 ---------------------------------------------------------------------- .../GridCacheBatchEvictUnswapSelfTest.java | 193 ++++++++ ...heConcurrentEvictionConsistencySelfTest.java | 267 +++++++++++ .../GridCacheConcurrentEvictionsSelfTest.java | 183 ++++++++ .../GridCacheDistributedEvictionsSelfTest.java | 265 +++++++++++ .../GridCacheEmptyEntriesAbstractSelfTest.java | 304 ++++++++++++ .../GridCacheEmptyEntriesLocalSelfTest.java | 41 ++ ...ridCacheEmptyEntriesPartitionedSelfTest.java | 41 ++ .../eviction/GridCacheEvictionAbstractTest.java | 462 +++++++++++++++++++ .../GridCacheEvictionFilterSelfTest.java | 249 ++++++++++ .../GridCacheEvictionLockUnlockSelfTest.java | 175 +++++++ .../GridCacheEvictionTouchSelfTest.java | 347 ++++++++++++++ .../cache/eviction/GridCacheMockEntry.java | 365 +++++++++++++++ ...cheSynchronousEvictionsFailoverSelfTest.java | 160 +++++++ .../GridCacheFifoEvictionPolicySelfTest.java | 380 +++++++++++++++ .../lru/GridCacheLruEvictionPolicySelfTest.java | 426 +++++++++++++++++ .../GridCacheLruNearEvictionPolicySelfTest.java | 136 ++++++ ...heNearOnlyLruNearEvictionPolicySelfTest.java | 167 +++++++ .../GridCacheRandomEvictionPolicySelfTest.java | 265 +++++++++++ .../GridCacheEvictionSelfTestSuite.java | 8 +- .../GridCacheBatchEvictUnswapSelfTest.java | 193 -------- ...heConcurrentEvictionConsistencySelfTest.java | 267 ----------- .../GridCacheConcurrentEvictionsSelfTest.java | 183 -------- .../GridCacheDistributedEvictionsSelfTest.java | 265 ----------- .../GridCacheEmptyEntriesAbstractSelfTest.java | 304 ------------ .../GridCacheEmptyEntriesLocalSelfTest.java | 41 -- ...ridCacheEmptyEntriesPartitionedSelfTest.java | 41 -- .../eviction/GridCacheEvictionAbstractTest.java | 462 ------------------- .../GridCacheEvictionFilterSelfTest.java | 249 ---------- .../GridCacheEvictionLockUnlockSelfTest.java | 175 ------- .../GridCacheEvictionTouchSelfTest.java | 347 -------------- .../cache/eviction/GridCacheMockEntry.java | 365 --------------- ...cheSynchronousEvictionsFailoverSelfTest.java | 160 ------- .../GridCacheFifoEvictionPolicySelfTest.java | 380 --------------- .../lru/GridCacheLruEvictionPolicySelfTest.java | 426 ----------------- .../GridCacheLruNearEvictionPolicySelfTest.java | 136 ------ ...heNearOnlyLruNearEvictionPolicySelfTest.java | 167 ------- .../GridCacheRandomEvictionPolicySelfTest.java | 265 ----------- 37 files changed, 4430 insertions(+), 4430 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheBatchEvictUnswapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheBatchEvictUnswapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheBatchEvictUnswapSelfTest.java new file mode 100644 index 0000000..4ba4d57 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheBatchEvictUnswapSelfTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Swap benchmark. + */ +@SuppressWarnings("BusyWait") +public class GridCacheBatchEvictUnswapSelfTest extends GridCacheAbstractSelfTest { + /** Eviction policy size. */ + public static final int EVICT_PLC_SIZE = 100000; + + /** Keys count. */ + public static final int KEYS_CNT = 100000; + + /** Batch size. */ + private static final int BATCH_SIZE = 200; + + /** Number of threads for concurrent benchmark + concurrency level. */ + private static final int N_THREADS = 8; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + // Let this test run 2 minutes as it runs for 20 seconds locally. + return 2 * 60 * 1000; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cacheCfg = super.cacheConfiguration(gridName); + + cacheCfg.setCacheMode(GridCacheMode.PARTITIONED); + + CacheStore store = new CacheStoreAdapter<Long, String>() { + @Nullable @Override public String load(Long key) { + return null; + } + + @Override public void loadCache(final IgniteBiInClosure<Long, String> c, + @Nullable Object... args) { + for (int i = 0; i < KEYS_CNT; i++) + c.apply((long)i, String.valueOf(i)); + } + + @Override public void write(Cache.Entry<? extends Long, ? extends String> val) { + // No-op. + } + + @Override public void delete(Object key) { + // No-op. + } + }; + + cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(true); + cacheCfg.setLoadPreviousValue(true); + + cacheCfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy(EVICT_PLC_SIZE)); + cacheCfg.setSwapEnabled(true); + cacheCfg.setEvictSynchronized(false); + cacheCfg.setDistributionMode(PARTITIONED_ONLY); + + return cacheCfg; + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentEvictions() throws Exception { + runConcurrentTest(grid(0), KEYS_CNT, BATCH_SIZE); + } + + /** + * @param g Grid instance. + * @param keysCnt Number of keys to swap and promote. + * @param batchSize Size of batch to swap/promote. + * @throws Exception If failed. + */ + private void runConcurrentTest(Ignite g, final int keysCnt, final int batchSize) throws Exception { + assert keysCnt % batchSize == 0; + + final AtomicInteger evictedKeysCnt = new AtomicInteger(); + + final GridCache<Object, Object> cache = g.cache(null); + + cache.loadCache(null, 0); + + info("Finished load cache."); + + IgniteFuture<?> evictFut = multithreadedAsync(new Runnable() { + @Override public void run() { + Collection<Long> keys = new ArrayList<>(batchSize); + + int evictedBatches = 0; + + for (long i = 0; i < keysCnt; i++) { + keys.add(i); + + if (keys.size() == batchSize) { + cache.evictAll(keys); + + evictedKeysCnt.addAndGet(batchSize); + + keys.clear(); + + evictedBatches++; + + if (evictedBatches % 100 == 0 && evictedBatches > 0) + info("Evicted " + (evictedBatches * batchSize) + " entries."); + } + } + } + }, N_THREADS, "evict"); + + final AtomicInteger unswappedKeys = new AtomicInteger(); + + IgniteFuture<?> unswapFut = multithreadedAsync(new Runnable() { + @Override public void run() { + try { + Collection<Long> keys = new ArrayList<>(batchSize); + + int unswappedBatches = 0; + + for (long i = 0; i < keysCnt; i++) { + keys.add(i); + + if (keys.size() == batchSize) { + cache.promoteAll(keys); + + unswappedKeys.addAndGet(batchSize); + + keys.clear(); + + unswappedBatches++; + + if (unswappedBatches % 100 == 0 && unswappedBatches > 0) + info("Unswapped " + (unswappedBatches * batchSize) + " entries."); + } + } + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + }, N_THREADS, "promote"); + + evictFut.get(); + + unswapFut.get(); + + info("Clearing cache."); + + for (long i = 0; i < KEYS_CNT; i++) + cache.remove(i); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheConcurrentEvictionConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheConcurrentEvictionConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheConcurrentEvictionConsistencySelfTest.java new file mode 100644 index 0000000..a894074 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheConcurrentEvictionConsistencySelfTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * + */ +public class GridCacheConcurrentEvictionConsistencySelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Default iteration count. */ + private static final int ITERATION_CNT = 50000; + + /** Size of policy internal queue. */ + private static final int POLICY_QUEUE_SIZE = 50; + + /** Tested policy. */ + private GridCacheEvictionPolicy<?, ?> plc; + + /** Key count to put into the cache. */ + private int keyCnt; + + /** Number of threads. */ + private int threadCnt = 50; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.getTransactionsConfiguration().setDefaultTxConcurrency(PESSIMISTIC); + c.getTransactionsConfiguration().setDefaultTxIsolation(READ_COMMITTED); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(LOCAL); + + cc.setSwapEnabled(false); + + cc.setWriteSynchronizationMode(FULL_SYNC); + + cc.setDistributionMode(PARTITIONED_ONLY); + + cc.setEvictionPolicy(plc); + + c.setCacheConfiguration(cc); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + return c; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000; // 5 min. + } + + /** + * @throws Exception If failed. + */ + public void testPolicyConsistencyFifoLocalTwoKeys() throws Exception { + plc = new GridCacheFifoEvictionPolicy<Object, Object>(1); + + keyCnt = 2; + threadCnt = 10; + + checkPolicyConsistency(); + } + + /** + * @throws Exception If failed. + */ + public void testPolicyConsistencyLruLocalTwoKeys() throws Exception { + plc = new GridCacheLruEvictionPolicy<Object, Object>(1); + + keyCnt = 2; + threadCnt = 10; + + checkPolicyConsistency(); + } + + /** + * @throws Exception If failed. + */ + public void testPolicyConsistencyFifoLocalFewKeys() throws Exception { + plc = new GridCacheFifoEvictionPolicy<Object, Object>(POLICY_QUEUE_SIZE); + + keyCnt = POLICY_QUEUE_SIZE + 5; + + checkPolicyConsistency(); + } + + /** + * @throws Exception If failed. + */ + public void testPolicyConsistencyLruLocalFewKeys() throws Exception { + plc = new GridCacheLruEvictionPolicy<Object, Object>(POLICY_QUEUE_SIZE); + + keyCnt = POLICY_QUEUE_SIZE + 5; + + checkPolicyConsistency(); + } + + /** + * @throws Exception If failed. + */ + public void testPolicyConsistencyFifoLocal() throws Exception { + plc = new GridCacheFifoEvictionPolicy<Object, Object>(POLICY_QUEUE_SIZE); + + keyCnt = POLICY_QUEUE_SIZE * 10; + + checkPolicyConsistency(); + } + + /** + * @throws Exception If failed. + */ + public void testPolicyConsistencyLruLocal() throws Exception { + plc = new GridCacheLruEvictionPolicy<Object, Object>(POLICY_QUEUE_SIZE); + + keyCnt = POLICY_QUEUE_SIZE * 10; + + checkPolicyConsistency(); + } + + /** + * @throws Exception If failed. + */ + private void checkPolicyConsistency() throws Exception { + try { + Ignite ignite = startGrid(1); + + final GridCache<Integer, Integer> cache = ignite.cache(null); + + long start = System.currentTimeMillis(); + + IgniteFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override + public Object call() throws Exception { + final Random rnd = new Random(); + + for (int i = 0; i < ITERATION_CNT; i++) { + + int j = rnd.nextInt(keyCnt); + + try (IgniteTx tx = cache.txStart()) { + // Put or remove? + if (rnd.nextBoolean()) + cache.putx(j, j); + else + cache.remove(j); + + tx.commit(); + } + + if (i != 0 && i % 10000 == 0) + info("Stats [iterCnt=" + i + ", size=" + cache.size() + ']'); + } + + return null; + } + }, + threadCnt + ); + + fut.get(); + + Collection<GridCacheEntry<Integer, Integer>> queue = internalQueue(plc); + + info("Test results [threadCnt=" + threadCnt + ", iterCnt=" + ITERATION_CNT + ", cacheSize=" + cache.size() + + ", internalQueueSize" + queue.size() + ", duration=" + (System.currentTimeMillis() - start) + ']'); + + for (GridCacheEntry<Integer, Integer> e : queue) { + Integer rmv = cache.remove(e.getKey()); + + if (rmv == null) + fail("Eviction policy contains key that is not present in cache: " + e); + else + info("Entry removed: " + rmv); + } + + if (!cache.isEmpty()) { + boolean zombies = false; + + for (GridCacheEntry<Integer, Integer> e : cache) { + U.warn(log, "Zombie entry: " + e); + + zombies = true; + } + + if (zombies) + fail("Cache contained zombie entries."); + } + else + info("Cache is empty after test."); + } + finally { + stopAllGrids(); + } + } + + /** + * Gets internal policy queue. + * + * @param plc Policy to get queue from. + * @return Internal entries collection. + */ + private Collection<GridCacheEntry<Integer, Integer>> internalQueue(GridCacheEvictionPolicy<?, ?> plc) { + if (plc instanceof GridCacheFifoEvictionPolicy) { + GridCacheFifoEvictionPolicy<Integer, Integer> plc0 = (GridCacheFifoEvictionPolicy<Integer, Integer>)plc; + + return plc0.queue(); + } + else if (plc instanceof GridCacheLruEvictionPolicy) { + GridCacheLruEvictionPolicy<Integer, Integer> plc0 = (GridCacheLruEvictionPolicy<Integer, Integer>)plc; + + return plc0.queue(); + } + + assert false : "Unexpected policy type: " + plc.getClass().getName(); + + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheConcurrentEvictionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheConcurrentEvictionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheConcurrentEvictionsSelfTest.java new file mode 100644 index 0000000..7c84ddb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheConcurrentEvictionsSelfTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * + */ +public class GridCacheConcurrentEvictionsSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Replicated cache. */ + private GridCacheMode mode = REPLICATED; + + /** */ + private GridCacheEvictionPolicy<?, ?> plc; + + /** */ + private GridCacheEvictionPolicy<?, ?> nearPlc; + + /** */ + private int warmUpPutsCnt; + + /** */ + private int iterCnt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.getTransactionsConfiguration().setDefaultTxConcurrency(PESSIMISTIC); + c.getTransactionsConfiguration().setDefaultTxIsolation(READ_COMMITTED); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + + cc.setSwapEnabled(false); + + cc.setWriteSynchronizationMode(FULL_SYNC); + + cc.setDistributionMode(PARTITIONED_ONLY); + + cc.setEvictionPolicy(plc); + cc.setNearEvictionPolicy(nearPlc); + + c.setCacheConfiguration(cc); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + plc = null; + nearPlc = null; + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentPutsFifoLocal() throws Exception { + mode = LOCAL; + plc = new GridCacheFifoEvictionPolicy<Object, Object>(1000); + nearPlc = null; + warmUpPutsCnt = 100000; + iterCnt = 100000; + + checkConcurrentPuts(); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentPutsLruLocal() throws Exception { + mode = LOCAL; + plc = new GridCacheLruEvictionPolicy<Object, Object>(1000); + nearPlc = null; + warmUpPutsCnt = 100000; + iterCnt = 100000; + + checkConcurrentPuts(); + } + + /** + * @throws Exception If failed. + */ + private void checkConcurrentPuts() throws Exception { + try { + Ignite ignite = startGrid(1); + + final GridCache<Integer, Integer> cache = ignite.cache(null); + + // Warm up. + for (int i = 0; i < warmUpPutsCnt; i++) { + cache.putx(i, i); + + if (i != 0 && i % 1000 == 0) + info("Warm up puts count: " + i); + } + + info("Cache size: " + cache.size()); + + cache.removeAll(); + + final AtomicInteger idx = new AtomicInteger(); + + int threadCnt = 30; + + long start = System.currentTimeMillis(); + + IgniteFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < iterCnt; i++) { + int j = idx.incrementAndGet(); + + cache.putx(j, j); + + if (i != 0 && i % 10000 == 0) + // info("Puts count: " + i); + info("Stats [putsCnt=" + i + ", size=" + cache.size() + ']'); + } + + return null; + } + }, + threadCnt + ); + + fut.get(); + + info("Test results [threadCnt=" + threadCnt + ", iterCnt=" + iterCnt + ", cacheSize=" + cache.size() + + ", duration=" + (System.currentTimeMillis() - start) + ']'); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheDistributedEvictionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheDistributedEvictionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheDistributedEvictionsSelfTest.java new file mode 100644 index 0000000..eb1faba --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheDistributedEvictionsSelfTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * + */ +public class GridCacheDistributedEvictionsSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private int gridCnt = 2; + + /** */ + private GridCacheMode mode; + + /** */ + private boolean nearEnabled; + + /** */ + private boolean evictSync; + + /** */ + private boolean evictNearSync; + + /** */ + private final AtomicInteger idxGen = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TransactionsConfiguration tCfg = new TransactionsConfiguration(); + + tCfg.setDefaultTxConcurrency(PESSIMISTIC); + tCfg.setDefaultTxIsolation(READ_COMMITTED); + + c.setTransactionsConfiguration(tCfg); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setAtomicityMode(TRANSACTIONAL); + + cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + + cc.setSwapEnabled(false); + + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + + // Set only DHT policy, leave default near policy. + cc.setEvictionPolicy(new GridCacheFifoEvictionPolicy<>(10)); + cc.setEvictSynchronized(evictSync); + cc.setEvictNearSynchronized(evictNearSync); + cc.setEvictSynchronizedKeyBufferSize(1); + + cc.setAffinity(new GridCacheModuloAffinityFunction(gridCnt, 1)); + + c.setCacheConfiguration(cc); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement())); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** @throws Throwable If failed. */ + public void testNearSyncBackupUnsync() throws Throwable { + gridCnt = 3; + mode = PARTITIONED; + evictNearSync = true; + evictSync = false; + nearEnabled = true; + + checkEvictions(); + } + + /** @throws Throwable If failed. */ + public void testNearSyncBackupSync() throws Throwable { + gridCnt = 3; + mode = PARTITIONED; + evictNearSync = true; + evictSync = true; + nearEnabled = true; + + checkEvictions(); + } + + /** @throws Throwable If failed. */ + public void testNearUnsyncBackupSync() throws Throwable { + gridCnt = 1; + mode = PARTITIONED; + evictNearSync = false; + evictSync = true; + nearEnabled = true; + + try { + startGrid(0); + + assert false : "Grid was started with illegal configuration."; + } + catch (IgniteCheckedException e) { + info("Caught expected exception: " + e); + } + } + + /** + * http://atlassian.gridgain.com/jira/browse/GG-9002 + * + * @throws Throwable If failed. + */ + public void testLocalSync() throws Throwable { + gridCnt = 1; + mode = LOCAL; + evictNearSync = true; + evictSync = true; + nearEnabled = true; + + Ignite g = startGrid(0); + + final GridCache<Integer, Integer> cache = g.cache(null); + + for (int i = 1; i < 20; i++) { + cache.putx(i * gridCnt, i * gridCnt); + + info("Put to cache: " + i * gridCnt); + } + } + + /** @throws Throwable If failed. */ + private void checkEvictions() throws Throwable { + try { + startGrids(gridCnt); + + Ignite ignite = grid(0); + + final GridCache<Integer, Integer> cache = ignite.cache(null); + + // Put 1 entry to primary node. + cache.putx(0, 0); + + Integer nearVal = this.<Integer, Integer>cache(2).get(0); + + assert nearVal == 0 : "Unexpected near value: " + nearVal; + + // Put several vals to primary node. + for (int i = 1; i < 20; i++) { + cache.putx(i * gridCnt, i * gridCnt); + + info("Put to cache: " + i * gridCnt); + } + + for (int i = 0; i < 3; i++) { + try { + assert cache(2).get(0) == null : "Entry has not been evicted from near node for key: " + 0; + assert cache(1).get(0) == null : "Entry has not been evicted from backup node for key: " + 0; + assert cache.get(0) == null : "Entry has not been evicted from primary node for key: " + 0; + } + catch (Throwable e) { + if (i == 2) + // No attempts left. + throw e; + + U.warn(log, "Check failed (will retry in 2000 ms): " + e); + + // Unwind evicts? + cache.get(0); + + U.sleep(2000); + } + } + + for (int i = 0; i < 3; i++) { + info("Primary key set: " + new TreeSet<>(this.<Integer, Integer>dht(0).keySet())); + info("Primary near key set: " + new TreeSet<>(this.<Integer, Integer>near(0).keySet())); + + info("Backup key set: " + new TreeSet<>(this.<Integer, Integer>dht(1).keySet())); + info("Backup near key set: " + new TreeSet<>(this.<Integer, Integer>near(1).keySet())); + + info("Near key set: " + new TreeSet<>(this.<Integer, Integer>dht(2).keySet())); + info("Near node near key set: " + new TreeSet<>(this.<Integer, Integer>near(2).keySet())); + + try { + assert cache.size() == 10 : "Invalid cache size [size=" + cache.size() + + ", keys=" + new TreeSet<>(cache.keySet()) + ']'; + assert cache.size() == 10 : "Invalid key size [size=" + cache.size() + + ", keys=" + new TreeSet<>(cache.keySet()) + ']'; + + assert cache(2).isEmpty(); + + break; + } + catch (Throwable e) { + if (i == 2) + // No attempts left. + throw e; + + U.warn(log, "Check failed (will retry in 2000 ms): " + e); + + // Unwind evicts? + cache.get(0); + + U.sleep(2000); + } + } + } + catch (Throwable t) { + error("Test failed.", t); + + throw t; + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesAbstractSelfTest.java new file mode 100644 index 0000000..3902479 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesAbstractSelfTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.cache.eviction.GridCacheEvictionPolicy; +import org.apache.ignite.cache.eviction.fifo.GridCacheFifoEvictionPolicy; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import javax.cache.*; +import javax.cache.configuration.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; + +/** + * Tests that cache handles {@code setAllowEmptyEntries} flag correctly. + */ +public abstract class GridCacheEmptyEntriesAbstractSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private GridCacheEvictionPolicy<?, ?> plc; + + /** */ + private GridCacheEvictionPolicy<?, ?> nearPlc; + + /** Test store. */ + private CacheStore<String, String> testStore; + + /** Tx concurrency to use. */ + private IgniteTxConcurrency txConcurrency; + + /** Tx isolation to use. */ + private IgniteTxIsolation txIsolation; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TransactionsConfiguration txCfg = c.getTransactionsConfiguration(); + + txCfg.setDefaultTxConcurrency(txConcurrency); + txCfg.setDefaultTxIsolation(txIsolation); + txCfg.setTxSerializableEnabled(true); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(cacheMode()); + cc.setAtomicityMode(TRANSACTIONAL); + + cc.setSwapEnabled(false); + + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cc.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); + + cc.setEvictionPolicy(plc); + cc.setNearEvictionPolicy(nearPlc); + cc.setEvictSynchronizedKeyBufferSize(1); + + cc.setEvictNearSynchronized(true); + cc.setEvictSynchronized(true); + + if (testStore != null) { + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(testStore)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + } + else + cc.setCacheStoreFactory(null); + + c.setCacheConfiguration(cc); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + return c; + } + + /** + * Starts grids depending on testing cache. + * + * @return First grid node. + * @throws Exception If failed. + */ + protected abstract Ignite startGrids() throws Exception; + + /** @return Cache mode for particular test. */ + protected abstract GridCacheMode cacheMode(); + + /** + * Tests FIFO eviction policy. + * + * @throws Exception If failed. + */ + public void testFifo() throws Exception { + plc = new GridCacheFifoEvictionPolicy(50); + nearPlc = new GridCacheFifoEvictionPolicy(50); + + checkPolicy(); + } + + /** + * Checks policy with and without store set. + * + * @throws Exception If failed. + */ + private void checkPolicy() throws Exception { + testStore = null; + + checkPolicy0(); + + testStore = new CacheStoreAdapter<String, String>() { + @Override public String load(String key) { + return null; + } + + @Override public void write(Cache.Entry<? extends String, ? extends String> e) { + // No-op. + } + + @Override public void delete(Object key) { + // No-op. + } + }; + + checkPolicy0(); + } + + /** + * Tests preset eviction policy. + * + * @throws Exception If failed. + */ + private void checkPolicy0() throws Exception { + for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { + txConcurrency = concurrency; + + for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { + txIsolation = isolation; + + Ignite g = startGrids(); + + GridCache<String, String> cache = g.cache(null); + + try { + info(">>> Checking policy [txConcurrency=" + txConcurrency + ", txIsolation=" + txIsolation + + ", plc=" + plc + ", nearPlc=" + nearPlc + ']'); + + checkExplicitTx(cache); + + checkImplicitTx(cache); + } + finally { + stopAllGrids(); + } + } + } + } + + /** + * Checks that gets work for implicit txs. + * + * @param cache Cache to test. + * @throws Exception If failed. + */ + private void checkImplicitTx(GridCache<String, String> cache) throws Exception { + assertNull(cache.get("key1")); + assertNull(cache.getAsync("key2").get()); + + assertTrue(cache.getAll(F.asList("key3", "key4")).isEmpty()); + assertTrue(cache.getAllAsync(F.asList("key5", "key6")).get().isEmpty()); + + cache.put("key7", "key7"); + cache.remove("key7", "key7"); + assertNull(cache.get("key7")); + + checkEmpty(cache); + } + + /** + * Checks that gets work for implicit txs. + * + * @param cache Cache to test. + * @throws Exception If failed. + */ + private void checkExplicitTx(GridCache<String, String> cache) throws Exception { + IgniteTx tx = cache.txStart(); + + try { + assertNull(cache.get("key1")); + + tx.commit(); + } + finally { + tx.close(); + } + + tx = cache.txStart(); + + try { + assertNull(cache.getAsync("key2").get()); + + tx.commit(); + } + finally { + tx.close(); + } + + tx = cache.txStart(); + + try { + assertTrue(cache.getAll(F.asList("key3", "key4")).isEmpty()); + + tx.commit(); + } + finally { + tx.close(); + } + + tx = cache.txStart(); + + try { + assertTrue(cache.getAllAsync(F.asList("key5", "key6")).get().isEmpty()); + + tx.commit(); + } + finally { + tx.close(); + } + + tx = cache.txStart(); + + try { + cache.put("key7", "key7"); + + cache.remove("key7"); + + assertNull(cache.get("key7")); + + tx.commit(); + } + finally { + tx.close(); + } + + checkEmpty(cache); + } + + /** + * Checks that cache is empty. + * + * @param cache Cache to check. + * @throws org.apache.ignite.IgniteInterruptedException If interrupted while sleeping. + */ + @SuppressWarnings({"ErrorNotRethrown", "TypeMayBeWeakened"}) + private void checkEmpty(GridCache<String, String> cache) throws IgniteInterruptedException { + for (int i = 0; i < 3; i++) { + try { + assertTrue(cache.entrySet().toString(), cache.entrySet().isEmpty()); + + break; + } + catch (AssertionError e) { + if (i == 2) + throw e; + + info(">>> Cache is not empty, flushing evictions."); + + U.sleep(1000); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesLocalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesLocalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesLocalSelfTest.java new file mode 100644 index 0000000..7a40220 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesLocalSelfTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; + +/** + * + */ +public class GridCacheEmptyEntriesLocalSelfTest extends GridCacheEmptyEntriesAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Ignite startGrids() throws Exception { + return startGrid(); + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return GridCacheMode.LOCAL; + } + + /** {@inheritDoc} */ + @Override public void testFifo() throws Exception { + super.testFifo(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesPartitionedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesPartitionedSelfTest.java new file mode 100644 index 0000000..9ddece4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEmptyEntriesPartitionedSelfTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; + +/** + * Test allow empty entries flag on partitioned cache. + */ +public class GridCacheEmptyEntriesPartitionedSelfTest extends GridCacheEmptyEntriesAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Ignite startGrids() throws Exception { + return startGridsMultiThreaded(3); + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return GridCacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override public void testFifo() throws Exception { + super.testFifo(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionAbstractTest.java new file mode 100644 index 0000000..cf7015d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionAbstractTest.java @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Base class for eviction tests. + */ +public abstract class GridCacheEvictionAbstractTest<T extends GridCacheEvictionPolicy<?, ?>> + extends GridCommonAbstractTest { + /** IP finder. */ + protected static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Replicated cache. */ + protected GridCacheMode mode = REPLICATED; + + /** Near enabled flag. */ + protected boolean nearEnabled; + + /** Evict backup sync. */ + protected boolean evictSync; + + /** Evict near sync. */ + protected boolean evictNearSync = true; + + /** Policy max. */ + protected int plcMax = 10; + + /** Near policy max. */ + protected int nearMax = 3; + + /** Synchronous commit. */ + protected boolean syncCommit; + + /** */ + protected int gridCnt = 2; + + /** */ + protected GridCacheEvictionFilter<?, ?> filter; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + cc.setEvictionPolicy(createPolicy(plcMax)); + cc.setNearEvictionPolicy(createNearPolicy(nearMax)); + cc.setEvictSynchronized(evictSync); + cc.setEvictNearSynchronized(evictNearSync); + cc.setSwapEnabled(false); + cc.setWriteSynchronizationMode(syncCommit ? FULL_SYNC : FULL_ASYNC); + cc.setStartSize(plcMax); + cc.setAtomicityMode(TRANSACTIONAL); + + if (mode == PARTITIONED) + cc.setBackups(1); + + if (filter != null) + cc.setEvictionFilter(filter); + + c.setCacheConfiguration(cc); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + + c.setIncludeProperties(); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + filter = null; + + super.afterTestsStopped(); + } + + /** + * @param arr Array. + * @param idx Index. + * @return Entry at the index. + */ + protected MockEntry entry(MockEntry[] arr, int idx) { + MockEntry e = arr[idx]; + + if (e.isEvicted()) + e = arr[idx] = new MockEntry(e.getKey()); + + return e; + } + + /** + * @param prefix Prefix. + * @param p Policy. + */ + protected void info(String prefix, GridCacheEvictionPolicy<?, ?> p) { + info(prefix + ": " + p.toString()); + } + + /** @param p Policy. */ + protected void info(GridCacheEvictionPolicy<?, ?> p) { + info(p.toString()); + } + + /** + * @param c1 Policy collection. + * @param c2 Expected list. + */ + protected void check(Collection<GridCacheEntry<String, String>> c1, MockEntry... c2) { + check(c1, F.asList(c2)); + } + + /** @return Policy. */ + @SuppressWarnings({"unchecked"}) + protected T policy() { + return (T)grid().cache(null).configuration().getEvictionPolicy(); + } + + /** + * @param i Grid index. + * @return Policy. + */ + @SuppressWarnings({"unchecked"}) + protected T policy(int i) { + return (T)grid(i).cache(null).configuration().getEvictionPolicy(); + } + + /** + * @param i Grid index. + * @return Policy. + */ + @SuppressWarnings({"unchecked"}) + protected T nearPolicy(int i) { + return (T)grid(i).cache(null).configuration().getNearEvictionPolicy(); + } + + /** + * @param c1 Policy collection. + * @param c2 Expected list. + */ + protected void check(Collection<GridCacheEntry<String, String>> c1, List<MockEntry> c2) { + assert c1.size() == c2.size() : "Mismatch [actual=" + string(c1) + ", expected=" + string(c2) + ']'; + + assert c1.containsAll(c2) : "Mismatch [actual=" + string(c1) + ", expected=" + string(c2) + ']'; + + int i = 0; + + // Check order. + for (GridCacheEntry<String, String> e : c1) + assertEquals(e, c2.get(i++)); + } + + /** + * @param c Collection. + * @return String. + */ + protected String string(Iterable<? extends GridCacheEntry> c) { + return "[" + F.fold(c, "", new C2<GridCacheEntry, String, String>() { + @Override public String apply(GridCacheEntry e, String b) { + return b.isEmpty() ? e.getKey().toString() : b + ", " + e.getKey(); + } + }) + "]]"; + } + + /** @throws Exception If failed. */ + public void testPartitionedNearDisabled() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + plcMax = 10; + syncCommit = true; + + gridCnt = 2; + + checkPartitioned(plcMax, plcMax, false); + } + + /** @throws Exception If failed. */ + public void testPartitionedNearEnabled() throws Exception { + mode = PARTITIONED; + nearEnabled = true; + nearMax = 3; + plcMax = 10; + evictNearSync = true; + syncCommit = true; + + gridCnt = 2; + + checkPartitioned(0, 0, true); // Near size is 0 because of backups present. + } + + /** @throws Exception If failed. */ + public void testPartitionedNearDisabledMultiThreaded() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + plcMax = 100; + evictSync = false; + + gridCnt = 2; + + checkPartitionedMultiThreaded(gridCnt); + } + + /** @throws Exception If failed. */ + public void testPartitionedNearDisabledBackupSyncMultiThreaded() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + plcMax = 100; + evictSync = true; + + gridCnt = 2; + + checkPartitionedMultiThreaded(gridCnt); + } + + /** @throws Exception If failed. */ + public void testPartitionedNearEnabledMultiThreaded() throws Exception { + mode = PARTITIONED; + nearEnabled = true; + plcMax = 10; + evictSync = false; + + gridCnt = 2; + + checkPartitionedMultiThreaded(gridCnt); + } + + /** @throws Exception If failed. */ + public void testPartitionedNearEnabledBackupSyncMultiThreaded() throws Exception { + mode = PARTITIONED; + nearEnabled = true; + plcMax = 10; + evictSync = true; + + gridCnt = 2; + + checkPartitionedMultiThreaded(gridCnt); + } + + /** + * @param endSize Final near size. + * @param endPlcSize Final near policy size. + * @throws Exception If failed. + */ + private void checkPartitioned(int endSize, int endPlcSize, boolean near) throws Exception { + startGridsMultiThreaded(gridCnt); + + try { + Random rand = new Random(); + + int cnt = 500; + + for (int i = 0; i < cnt; i++) { + GridCache<Integer, String> cache = grid(rand.nextInt(2)).cache(null); + + int key = rand.nextInt(100); + String val = Integer.toString(key); + + cache.put(key, val); + + if (i % 100 == 0) + info("Stored cache object for key [key=" + key + ", idx=" + i + ']'); + } + + if (near) { + for (int i = 0; i < gridCnt; i++) + assertEquals(endSize, near(i).nearSize()); + + if (endPlcSize >= 0) + checkNearPolicies(endPlcSize); + } + else { + for (int i = 0; i < gridCnt; i++) { + int actual = colocated(i).size(); + + assertTrue("Cache size is greater then policy size [expected=" + endSize + ", actual=" + actual + ']', + actual <= endSize); + } + + checkPolicies(endPlcSize); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param gridCnt Grid count. + * @throws Exception If failed. + */ + protected void checkPartitionedMultiThreaded(int gridCnt) throws Exception { + try { + startGridsMultiThreaded(gridCnt); + + final Random rand = new Random(); + + final AtomicInteger cntr = new AtomicInteger(); + + multithreaded(new Callable() { + @Nullable @Override public Object call() throws Exception { + int cnt = 100; + + for (int i = 0; i < cnt && !Thread.currentThread().isInterrupted(); i++) { + GridCache<Integer, String> cache = grid(rand.nextInt(2)).cache(null); + + int key = rand.nextInt(1000); + String val = Integer.toString(key); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + String v = cache.get(key); + + assert v == null || v.equals(Integer.toString(key)) : "Invalid value for key [key=" + key + + ", val=" + v + ']'; + + cache.put(key, val); + + tx.commit(); + } + + if (cntr.incrementAndGet() % 100 == 0) + info("Stored cache object for key [key=" + key + ", idx=" + i + ']'); + } + + return null; + } + }, 10); + } + finally { + stopAllGrids(); + } + } + + /** + * @param plcMax Policy max. + * @return Policy. + */ + protected abstract T createPolicy(int plcMax); + + /** + * @param nearMax Near max. + * @return Policy. + */ + protected abstract T createNearPolicy(int nearMax); + + /** + * Performs after-test near policy check. + * + * @param nearMax Near max. + */ + protected abstract void checkNearPolicies(int nearMax); + + /** + * Performs after-test policy check. + * + * @param plcMax Maximum allowed size of ploicy. + */ + protected abstract void checkPolicies(int plcMax); + + /** + * + */ + @SuppressWarnings({"PublicConstructorInNonPublicClass"}) + protected static class MockEntry extends GridCacheMockEntry<String, String> { + /** */ + private final GridCacheProjection<String, String> parent; + + /** Entry value. */ + private String val; + + /** @param key Key. */ + public MockEntry(String key) { + super(key); + + parent = null; + } + + /** + * @param key Key. + * @param val Value. + */ + public MockEntry(String key, String val) { + super(key); + + this.val = val; + parent = null; + } + + /** + * @param key Key. + * @param parent Parent. + */ + public MockEntry(String key, @Nullable GridCacheProjection<String, String> parent) { + super(key); + + this.parent = parent; + } + + /** {@inheritDoc} */ + @Override public String getValue() throws IllegalStateException { + return val; + } + + /** {@inheritDoc} */ + @Override public String setValue(String val) { + String old = this.val; + + this.val = val; + + return old; + } + + /** {@inheritDoc} */ + @Override public GridCacheProjection<String, String> projection() { + return parent; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionFilterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionFilterSelfTest.java new file mode 100644 index 0000000..532d574 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionFilterSelfTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * Base class for eviction tests. + */ +public class GridCacheEvictionFilterSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Replicated cache. */ + private GridCacheMode mode = REPLICATED; + + /** Near enabled flag. */ + private boolean nearEnabled; + + /** */ + private EvictionFilter filter; + + /** Policy. */ + private GridCacheEvictionPolicy<Object, Object> plc = new GridCacheEvictionPolicy<Object, Object>() { + @Override public void onEntryAccessed(boolean rmv, GridCacheEntry entry) { + assert !(entry.peek() instanceof Integer); + } + }; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + cc.setEvictionPolicy(plc); + cc.setNearEvictionPolicy(plc); + cc.setEvictSynchronized(false); + cc.setEvictNearSynchronized(false); + cc.setSwapEnabled(false); + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cc.setEvictionFilter(filter); + cc.setPreloadMode(SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + + if (mode == PARTITIONED) + cc.setBackups(1); + + c.setCacheConfiguration(cc); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + return c; + } + + /** @throws Exception If failed. */ + public void testLocal() throws Exception { + mode = LOCAL; + + checkEvictionFilter(); + } + + /** @throws Exception If failed. */ + public void testReplicated() throws Exception { + mode = REPLICATED; + + checkEvictionFilter(); + } + + /** @throws Exception If failed. */ + public void testPartitioned() throws Exception { + mode = PARTITIONED; + nearEnabled = true; + + checkEvictionFilter(); + } + + /** @throws Exception If failed. */ + public void testPartitionedNearDisabled() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + + checkEvictionFilter(); + } + + /** @throws Exception If failed. */ + @SuppressWarnings("BusyWait") + private void checkEvictionFilter() throws Exception { + filter = new EvictionFilter(); + + startGridsMultiThreaded(2); + + try { + Ignite g = grid(0); + + GridCache<Object, Object> c = g.cache(null); + + int cnt = 1; + + for (int i = 0; i < cnt; i++) + c.putx(i, i); + + Map<Object, AtomicInteger> cnts = filter.counts(); + + int exp = mode == LOCAL ? 1 : mode == REPLICATED ? 2 : nearEnabled ? 3 : 2; + + for (int j = 0; j < 3; j++) { + boolean success = true; + + for (int i = 0; i < cnt; i++) { + int cnt0 = cnts.get(i).get(); + + success = cnt0 == exp; + + if (!success) { + U.warn(log, "Invalid count for key [key=" + i + ", cnt=" + cnt0 + ", expected=" + exp + ']'); + + break; + } + else + info("Correct count for key [key=" + i + ", cnt=" + cnt0 + ']'); + } + + if (success) + break; + + if (j < 2) + Thread.sleep(1000); + else + assert false : "Test has not succeeded (see log for details)."; + } + } + finally { + stopAllGrids(); + } + } + + /** + * This test case is just to visualize a support issue from client. It does not fail. + * + * @throws Exception If failed. + */ + public void _testPartitionedMixed() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + + filter = new EvictionFilter(); + + Ignite g = startGrid(); + + GridCache<Object, Object> cache = g.cache(null); + + try { + int id = 1; + + cache.putx(id++, 1); + cache.putx(id++, 2); + + for (int i = id + 1; i < 10; i++) { + cache.putx(id, id); + + cache.putx(i, String.valueOf(i)); + } + + info(">>>> " + cache.get(1)); + info(">>>> " + cache.get(2)); + info(">>>> " + cache.get(3)); + } + finally { + stopGrid(); + } + } + + /** + * + */ + private final class EvictionFilter implements GridCacheEvictionFilter<Object, Object> { + /** */ + private final ConcurrentMap<Object, AtomicInteger> cnts = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public boolean evictAllowed(GridCacheEntry<Object, Object> entry) { + AtomicInteger i = cnts.get(entry.getKey()); + + if (i == null) { + AtomicInteger old = cnts.putIfAbsent(entry.getKey(), i = new AtomicInteger()); + + if (old != null) + i = old; + } + + i.incrementAndGet(); + + String grid = entry.projection().gridProjection().ignite().name(); + + boolean ret = !(entry.peek() instanceof Integer); + + if (!ret) + info(">>> Not evicting key [grid=" + grid + ", key=" + entry.getKey() + ", cnt=" + i.get() + ']'); + else + info(">>> Evicting key [grid=" + grid + ", key=" + entry.getKey() + ", cnt=" + i.get() + ']'); + + return ret; + } + + /** @return Counts. */ + ConcurrentMap<Object, AtomicInteger> counts() { + return cnts; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a916db69/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionLockUnlockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionLockUnlockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionLockUnlockSelfTest.java new file mode 100644 index 0000000..fe0991b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionLockUnlockSelfTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction; + +import org.apache.ignite.cache.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * + */ +public class GridCacheEvictionLockUnlockSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Evict latch. */ + private static CountDownLatch evictLatch; + + /** Evict counter. */ + private static final AtomicInteger evictCnt = new AtomicInteger(); + + /** Touch counter. */ + private static final AtomicInteger touchCnt = new AtomicInteger(); + + /** Cache mode. */ + private GridCacheMode mode; + + /** Number of grids to start. */ + private int gridCnt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cc.setEvictionPolicy(new EvictionPolicy()); + cc.setNearEvictionPolicy(new EvictionPolicy()); + cc.setEvictNearSynchronized(false); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + if (mode == PARTITIONED) + cc.setBackups(1); + + c.setCacheConfiguration(cc); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + c.setDiscoverySpi(discoSpi); + + return c; + } + + /** @throws Exception If failed. */ + public void testLocal() throws Exception { + mode = LOCAL; + gridCnt = 1; + + doTest(); + } + + /** @throws Exception If failed. */ + public void testReplicated() throws Exception { + mode = REPLICATED; + gridCnt = 3; + + doTest(); + } + + /** @throws Exception If failed. */ + public void testPartitioned() throws Exception { + mode = PARTITIONED; + gridCnt = 3; + + doTest(); + } + + /** @throws Exception If failed. */ + private void doTest() throws Exception { + try { + startGridsMultiThreaded(gridCnt); + + for (int i = 0; i < gridCnt; i++) + grid(i).events().localListen(new EvictListener(), EVT_CACHE_ENTRY_EVICTED); + + for (int i = 0; i < gridCnt; i++) { + reset(); + + IgniteCache<Object, Object> cache = jcache(i); + + cache.lock("key").lock(); + cache.lock("key").unlock(); + + assertTrue(evictLatch.await(3, SECONDS)); + + assertEquals(gridCnt, evictCnt.get()); + assertEquals(gridCnt, touchCnt.get()); + + for (int j = 0; j < gridCnt; j++) + assertFalse(cache(j).containsKey("key")); + } + } + finally { + stopAllGrids(); + } + } + + /** @throws Exception If failed. */ + private void reset() throws Exception { + evictLatch = new CountDownLatch(gridCnt); + + evictCnt.set(0); + touchCnt.set(0); + } + + /** Eviction event listener. */ + private static class EvictListener implements IgnitePredicate<IgniteEvent> { + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_CACHE_ENTRY_EVICTED; + + evictCnt.incrementAndGet(); + + evictLatch.countDown(); + + return true; + } + } + + /** Eviction policy. */ + private static class EvictionPolicy implements GridCacheEvictionPolicy<Object, Object> { + /** {@inheritDoc} */ + @Override public void onEntryAccessed(boolean rmv, GridCacheEntry<Object, Object> entry) { + touchCnt.incrementAndGet(); + + entry.evict(); + } + } +}