http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearEnabledPrimaryWriteOrderFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearEnabledPrimaryWriteOrderFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearEnabledPrimaryWriteOrderFullApiSelfTest.java new file mode 100644 index 0000000..56ad5d2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearEnabledPrimaryWriteOrderFullApiSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Tests atomic cache with near cache enabled. + */ +public class GridCacheAtomicNearEnabledPrimaryWriteOrderFullApiSelfTest + extends GridCacheAtomicPrimaryWriteOrderFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return NEAR_PARTITIONED; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicPrimaryWriteOrderFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicPrimaryWriteOrderFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicPrimaryWriteOrderFullApiSelfTest.java new file mode 100644 index 0000000..9a2022f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicPrimaryWriteOrderFullApiSelfTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; + +/** + * Test for atomic cache with primary write order mode. + */ +public class GridCacheAtomicPrimaryWriteOrderFullApiSelfTest extends GridCacheAtomicFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return PRIMARY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicPrimaryWriteOrderReloadAllSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicPrimaryWriteOrderReloadAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicPrimaryWriteOrderReloadAllSelfTest.java new file mode 100644 index 0000000..1c4b338 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicPrimaryWriteOrderReloadAllSelfTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; + +/** + * Tests reloadAll for colocated cache with primary write order. + */ +public class GridCacheAtomicPrimaryWriteOrderReloadAllSelfTest extends GridCacheAtomicReloadAllSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() { + return PRIMARY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicReloadAllSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicReloadAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicReloadAllSelfTest.java new file mode 100644 index 0000000..b609d29 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicReloadAllSelfTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; + +/** + * Tests reloadAll for colocated cache. + */ +public class GridCacheAtomicReloadAllSelfTest extends GridCachePartitionedReloadAllAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean nearEnabled() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheClientOnlySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheClientOnlySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheClientOnlySelfTest.java new file mode 100644 index 0000000..67fd6a3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheClientOnlySelfTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; + +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Client only test. + */ +public class GridCacheClientOnlySelfTest extends GridCacheClientModesAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected boolean clientOnly() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java new file mode 100644 index 0000000..eb3b86f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedDebugTest.java @@ -0,0 +1,977 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests for colocated cache. + */ +public class GridCacheColocatedDebugTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Test thread count. */ + private static final int THREAD_CNT = 10; + + /** Store enable flag. */ + private boolean storeEnabled; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setDistributionMode(PARTITIONED_ONLY); + cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 30)); + cacheCfg.setBackups(1); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setSwapEnabled(false); + + if (storeEnabled) { + cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new GridCacheTestStore())); + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(true); + cacheCfg.setLoadPreviousValue(true); + } + else + cacheCfg.setCacheStoreFactory(null); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSimplestPessimistic() throws Exception { + checkSinglePut(false, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testSimpleOptimistic() throws Exception { + checkSinglePut(true, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testReentry() throws Exception { + checkReentry(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedInTxSeparatePessimistic() throws Exception { + checkDistributedPut(true, true, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedInTxPessimistic() throws Exception { + checkDistributedPut(true, false, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedSeparatePessimistic() throws Exception { + checkDistributedPut(false, true, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedPessimistic() throws Exception { + checkDistributedPut(false, false, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedNonLocalInTxSeparatePessimistic() throws Exception { + checkNonLocalPuts(true, true, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedNonLocalInTxPessimistic() throws Exception { + checkNonLocalPuts(true, false, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedNonLocalSeparatePessimistic() throws Exception { + checkNonLocalPuts(false, true, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedNonLocalPessimistic() throws Exception { + checkNonLocalPuts(false, false, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testRollbackSeparatePessimistic() throws Exception { + checkRollback(true, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedInTxSeparateOptimistic() throws Exception { + checkDistributedPut(true, true, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedInTxOptimistic() throws Exception { + checkDistributedPut(true, false, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedNonLocalInTxSeparateOptimistic() throws Exception { + checkNonLocalPuts(true, true, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedNonLocalInTxOptimistic() throws Exception { + checkNonLocalPuts(true, false, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testRollbackSeparateOptimistic() throws Exception { + checkRollback(true, OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testRollback() throws Exception { + checkRollback(false, PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testPutsMultithreadedColocated() throws Exception { + checkPutsMultithreaded(true, false, 100000); + } + + /** + * @throws Exception If failed. + */ + public void testPutsMultithreadedRemote() throws Exception { + checkPutsMultithreaded(false, true, 100000); + } + + /** + * @throws Exception If failed. + */ + public void testPutsMultithreadedMixed() throws Exception { + checkPutsMultithreaded(true, true, 100000); + } + + /** + * @param loc Local puts. + * @param remote Remote puts. + * @param maxIterCnt Number of iterations. + * @throws Exception If failed. + */ + public void checkPutsMultithreaded(boolean loc, boolean remote, final long maxIterCnt) throws Exception { + storeEnabled = false; + + assert loc || remote; + + startGridsMultiThreaded(3); + + try { + final Ignite g0 = grid(0); + Ignite g1 = grid(1); + + final Collection<Integer> keys = new ConcurrentLinkedQueue<>(); + + if (loc) { + Integer key = -1; + + for (int i = 0; i < 20; i++) { + key = forPrimary(g0, key); + + keys.add(key); + } + } + + if (remote) { + Integer key = -1; + + for (int i = 0; i < 20; i++) { + key = forPrimary(g1, key); + + keys.add(key); + } + } + + final AtomicLong iterCnt = new AtomicLong(); + + final int keysCnt = 10; + + IgniteFuture<?> fut = multithreadedAsync(new Runnable() { + @Override public void run() { + // Make thread-local copy to shuffle keys. + List<Integer> threadKeys = new ArrayList<>(keys); + + long threadId = Thread.currentThread().getId(); + + try { + long itNum; + + while ((itNum = iterCnt.getAndIncrement()) < maxIterCnt) { + Collections.shuffle(threadKeys); + + List<Integer> iterKeys = threadKeys.subList(0, keysCnt); + + Collections.sort(iterKeys); + + Map<Integer, String> vals = U.newLinkedHashMap(keysCnt); + + for (Integer key : iterKeys) + vals.put(key, String.valueOf(key) + threadId); + + cache(0).putAll(vals); + + if (itNum > 0 && itNum % 5000 == 0) + info(">>> " + itNum + " iterations completed."); + } + } + catch (IgniteCheckedException e) { + fail("Unexpected exception caught: " + e); + } + } + }, THREAD_CNT); + + fut.get(); + + Thread.sleep(1000); + // Check that all transactions are committed. + for (int i = 0; i < 3; i++) { + GridCacheAdapter<Object, Object> cache = ((GridKernal)grid(i)).internalCache(); + + for (Integer key : keys) { + GridCacheEntryEx<Object, Object> entry = cache.peekEx(key); + + if (entry != null) { + Collection<GridCacheMvccCandidate<Object>> locCands = entry.localCandidates(); + Collection<GridCacheMvccCandidate<Object>> rmtCands = entry.remoteMvccSnapshot(); + + assert locCands == null || locCands.isEmpty() : "Local candidates is not empty [idx=" + i + + ", entry=" + entry + ']'; + assert rmtCands == null || rmtCands.isEmpty() : "Remote candidates is not empty [idx=" + i + + ", entry=" + entry + ']'; + } + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLockLockedLocal() throws Exception { + checkLockLocked(true); + } + + /** + * @throws Exception If failed. + */ + public void testLockLockedRemote() throws Exception { + checkLockLocked(false); + } + + /** + * + * @param loc Flag indicating local or remote key should be checked. + * @throws Exception If failed. + */ + private void checkLockLocked(boolean loc) throws Exception { + storeEnabled = false; + + startGridsMultiThreaded(3); + + try { + final Ignite g0 = grid(0); + Ignite g1 = grid(1); + + final Integer key = forPrimary(loc ? g0 : g1); + + final CountDownLatch lockLatch = new CountDownLatch(1); + final CountDownLatch unlockLatch = new CountDownLatch(1); + + IgniteFuture<?> unlockFut = multithreadedAsync(new Runnable() { + @Override public void run() { + try { + CacheLock lock = g0.jcache(null).lock(key); + + lock.lock(); + + try { + lockLatch.countDown(); + + U.await(unlockLatch); + } + finally { + lock.unlock(); + } + } + catch (IgniteCheckedException e) { + fail("Unexpected exception: " + e); + } + + } + }, 1); + + U.await(lockLatch); + + assert g0.jcache(null).isLocked(key); + assert !g0.jcache(null).isLockedByThread(key) : "Key can not be locked by current thread."; + + CacheLock lock = g0.jcache(null).lock(key); + + assert !lock.tryLock(); + + assert g0.cache(null).isLocked(key); + assert !g0.cache(null).isLockedByThread(key) : "Key can not be locked by current thread."; + + unlockLatch.countDown(); + unlockFut.get(); + + assert lock.tryLock(); + + lock.unlock(); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticGet() throws Exception { + storeEnabled = false; + + startGridsMultiThreaded(3); + + Ignite g0 = grid(0); + + try { + for (int i = 0; i < 100; i++) + g0.cache(null).put(i, i); + + for (int i = 0; i < 100; i++) { + try (IgniteTx tx = g0.cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { + Integer val = (Integer) g0.cache(null).get(i); + + assertEquals((Integer) i, val); + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param explicitTx Whether or not start implicit tx. + * @param concurrency Tx concurrency. + * @param isolation Tx isolation. + * @throws Exception If failed. + */ + private void checkSinglePut(boolean explicitTx, IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) + throws Exception { + startGrid(); + + try { + IgniteTx tx = explicitTx ? cache().txStart(concurrency, isolation) : null; + + try { + cache().putAll(F.asMap(1, "Hello", 2, "World")); + + if (tx != null) + tx.commit(); + + System.out.println(cache().metrics()); + + assertEquals("Hello", cache().get(1)); + assertEquals("World", cache().get(2)); + assertNull(cache().get(3)); + } + finally { + if (tx != null) + tx.close(); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param concurrency Tx concurrency. + * @param isolation Tx isolation. + * @throws Exception If failed. + */ + private void checkReentry(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { + startGrid(); + + try { + IgniteTx tx = cache().txStart(concurrency, isolation); + + try { + String old = (String)cache().get(1); + + assert old == null; + + String replaced = (String)cache().put(1, "newVal"); + + assert replaced == null; + + replaced = (String)cache().put(1, "newVal2"); + + assertEquals("newVal", replaced); + + if (tx != null) + tx.commit(); + + assertEquals("newVal2", cache().get(1)); + assertNull(cache().get(3)); + } + finally { + if (tx != null) + tx.close(); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param explicitTx Use explicit transactions. + * @param separate Use one-key puts instead of batch. + * @param concurrency Transactions concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + @SuppressWarnings("AssertEqualsBetweenInconvertibleTypes") + private void checkDistributedPut(boolean explicitTx, boolean separate, IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation) throws Exception { + storeEnabled = false; + + startGridsMultiThreaded(3); + + Ignite g0 = grid(0); + Ignite g1 = grid(1); + Ignite g2 = grid(2); + + try { + Integer k0 = forPrimary(g0); + Integer k1 = forPrimary(g1); + Integer k2 = forPrimary(g2); + + Map<Integer, String> map = F.asMap(k0, "val" + k0, k1, "val" + k1, k2, "val" + k2); + + IgniteTx tx = explicitTx ? g0.cache(null).txStart(concurrency, isolation) : null; + + try { + if (separate) { + g0.cache(null).put(k0, "val" + k0); + g0.cache(null).put(k1, "val" + k1); + g0.cache(null).put(k2, "val" + k2); + } + else + g0.cache(null).putAll(map); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + if (separate) { + assertEquals("val" + k0, g0.cache(null).get(k0)); + assertEquals("val" + k1, g0.cache(null).get(k1)); + assertEquals("val" + k2, g0.cache(null).get(k2)); + } + else { + Map<Object, Object> res = g0.cache(null).getAll(map.keySet()); + + assertEquals(map, res); + } + + tx = explicitTx ? g0.cache(null).txStart(concurrency, isolation) : null; + + try { + if (separate) { + g0.cache(null).remove(k0); + g0.cache(null).remove(k1); + g0.cache(null).remove(k2); + } + else + g0.cache(null).removeAll(map.keySet()); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + if (separate) { + assertEquals(null, g0.cache(null).get(k0)); + assertEquals(null, g0.cache(null).get(k1)); + assertEquals(null, g0.cache(null).get(k2)); + } + else { + Map<Object, Object> res = g0.cache(null).getAll(map.keySet()); + + assertTrue(res.isEmpty()); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param explicitTx Use explicit transactions. + * @param separate Use one-key puts instead of batch. + * @param concurrency Transactions concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + @SuppressWarnings("AssertEqualsBetweenInconvertibleTypes") + private void checkNonLocalPuts(boolean explicitTx, boolean separate, IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation) throws Exception { + storeEnabled = false; + + startGridsMultiThreaded(3); + + Ignite g0 = grid(0); + Ignite g1 = grid(1); + Ignite g2 = grid(2); + + try { + Integer k1 = forPrimary(g1); + Integer k2 = forPrimary(g2); + + Map<Integer, String> map = F.asMap(k1, "val" + k1, k2, "val" + k2); + + IgniteTx tx = explicitTx ? g0.cache(null).txStart(concurrency, isolation) : null; + + try { + if (separate) { + g0.cache(null).put(k1, "val" + k1); + g0.cache(null).put(k2, "val" + k2); + } + else + g0.cache(null).putAll(map); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + if (separate) { + assertEquals("val" + k1, g0.cache(null).get(k1)); + assertEquals("val" + k2, g0.cache(null).get(k2)); + } + else { + Map<Object, Object> res = g0.cache(null).getAll(map.keySet()); + + assertEquals(map, res); + } + + tx = explicitTx ? g0.cache(null).txStart(concurrency, isolation) : null; + + try { + if (separate) { + g0.cache(null).remove(k1); + g0.cache(null).remove(k2); + } + else + g0.cache(null).removeAll(map.keySet()); + + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } + + if (separate) { + assertEquals(null, g0.cache(null).get(k1)); + assertEquals(null, g0.cache(null).get(k2)); + } + else { + Map<Object, Object> res = g0.cache(null).getAll(map.keySet()); + + assertTrue(res.isEmpty()); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testWriteThrough() throws Exception { + storeEnabled = true; + + startGridsMultiThreaded(3); + + Ignite g0 = grid(0); + Ignite g1 = grid(1); + Ignite g2 = grid(2); + + try { + // Check local commit. + int k0 = forPrimary(g0); + int k1 = forPrimary(g0, k0); + int k2 = forPrimary(g0, k1); + + checkStoreWithValues(F.asMap(k0, String.valueOf(k0), k1, String.valueOf(k1), k2, String.valueOf(k2))); + + // Reassign keys. + k1 = forPrimary(g1); + k2 = forPrimary(g2); + + checkStoreWithValues(F.asMap(k0, String.valueOf(k0), k1, String.valueOf(k1), k2, String.valueOf(k2))); + + // Check remote only. + + checkStoreWithValues(F.asMap(k1, String.valueOf(k1), k2, String.valueOf(k2))); + } + finally { + stopAllGrids(); + } + } + + /** + * @param map Values to check. + * @throws Exception If failed. + */ + private void checkStoreWithValues(Map<Integer, String> map) throws Exception { + Ignite g0 = grid(0); + Ignite g1 = grid(1); + Ignite g2 = grid(2); + + g0.cache(null).putAll(map); + + checkStore(g0, map); + checkStore(g1, Collections.<Integer, String>emptyMap()); + checkStore(g2, Collections.<Integer, String>emptyMap()); + + clearStores(3); + + try (IgniteTx tx = g0.cache(null).txStart(OPTIMISTIC, READ_COMMITTED)) { + g0.cache(null).putAll(map); + + tx.commit(); + + checkStore(g0, map); + checkStore(g1, Collections.<Integer, String>emptyMap()); + checkStore(g2, Collections.<Integer, String>emptyMap()); + + clearStores(3); + } + } + + /** + * @param ignite Grid to take store from. + * @param map Expected values in store. + * @throws Exception If failed. + */ + private void checkStore(Ignite ignite, Map<Integer, String> map) throws Exception { + String cacheName = ignite.configuration().getCacheConfiguration()[0].getName(); + + GridCacheContext ctx = ((GridKernal)grid()).context().cache().internalCache(cacheName).context(); + + CacheStore store = ctx.store().configuredStore(); + + assertEquals(map, ((GridCacheTestStore)store).getMap()); + } + + /** + * Clears all stores. + * + * @param cnt Grid count. + */ + private void clearStores(int cnt) { + for (int i = 0; i < cnt; i++) { + String cacheName = grid(i).configuration().getCacheConfiguration()[0].getName(); + + GridCacheContext ctx = ((GridKernal)grid()).context().cache().internalCache(cacheName).context(); + + CacheStore store = ctx.store().configuredStore(); + + ((GridCacheTestStore)store).reset(); + } + } + + /** + * @param separate Use one-key puts instead of batch. + * @param concurrency Transactions concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + @SuppressWarnings("AssertEqualsBetweenInconvertibleTypes") + private void checkRollback(boolean separate, IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) + throws Exception { + storeEnabled = false; + + startGridsMultiThreaded(3); + + Ignite g0 = grid(0); + Ignite g1 = grid(1); + Ignite g2 = grid(2); + + try { + Integer k0 = forPrimary(g0); + Integer k1 = forPrimary(g1); + Integer k2 = forPrimary(g2); + + Map<Integer, String> map0 = F.asMap(k0, "val" + k0, k1, "val" + k1, k2, "val" + k2); + + g0.cache(null).putAll(map0); + + Map<Integer, String> map = F.asMap(k0, "value" + k0, k1, "value" + k1, k2, "value" + k2); + + IgniteTx tx = g0.cache(null).txStart(concurrency, isolation); + + try { + if (separate) { + g0.cache(null).put(k0, "value" + k0); + g0.cache(null).put(k1, "value" + k1); + g0.cache(null).put(k2, "value" + k2); + } + else + g0.cache(null).putAll(map); + + tx.rollback(); + } + finally { + tx.close(); + } + + if (separate) { + assertEquals("val" + k0, g0.cache(null).get(k0)); + assertEquals("val" + k1, g0.cache(null).get(k1)); + assertEquals("val" + k2, g0.cache(null).get(k2)); + } + else { + Map<Object, Object> res = g0.cache(null).getAll(map.keySet()); + + assertEquals(map0, res); + } + + tx = g0.cache(null).txStart(concurrency, isolation); + + try { + if (separate) { + g0.cache(null).remove(k0); + g0.cache(null).remove(k1); + g0.cache(null).remove(k2); + } + else + g0.cache(null).removeAll(map.keySet()); + + tx.rollback(); + } + finally { + tx.close(); + } + + if (separate) { + assertEquals("val" + k0, g0.cache(null).get(k0)); + assertEquals("val" + k1, g0.cache(null).get(k1)); + assertEquals("val" + k2, g0.cache(null).get(k2)); + } + else { + Map<Object, Object> res = g0.cache(null).getAll(map.keySet()); + + assertEquals(map0, res); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testExplicitLocks() throws Exception { + storeEnabled = false; + + startGrid(); + + try { + IgniteCache<Object, Object> cache = jcache(); + + cache.lock(1).lock(); + + assertNull(cache.getAndPut(1, "key1")); + assertEquals("key1", cache.getAndPut(1, "key2")); + assertEquals("key2", cache.get(1)); + + cache.lock(1).unlock(); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testExplicitLocksDistributed() throws Exception { + storeEnabled = false; + + startGridsMultiThreaded(3); + + Ignite g0 = grid(0); + Ignite g1 = grid(1); + Ignite g2 = grid(2); + + try { + Integer k0 = forPrimary(g0); + Integer k1 = forPrimary(g1); + Integer k2 = forPrimary(g2); + + IgniteCache<Object, Object> cache = jcache(0); + + cache.lock(k0).lock(); + cache.lock(k1).lock(); + cache.lock(k2).lock(); + + cache.put(k0, "val0"); + + cache.putAll(F.asMap(k1, "val1", k2, "val2")); + + assertEquals("val0", cache.get(k0)); + assertEquals("val1", cache.get(k1)); + assertEquals("val2", cache.get(k2)); + + cache.lock(k0).unlock(); + cache.lock(k1).unlock(); + cache.lock(k2).unlock(); + } + finally { + stopAllGrids(); + } + } + + /** + * Gets key for which given node is primary. + * + * @param g Grid. + * @return Key. + */ + private static Integer forPrimary(Ignite g) { + return forPrimary(g, -1); + } + + /** + * Gets next key for which given node is primary, starting with (prev + 1) + * + * @param g Grid. + * @param prev Previous key. + * @return Key. + */ + private static Integer forPrimary(Ignite g, int prev) { + for (int i = prev + 1; i < 10000; i++) { + if (g.cache(null).affinity().mapKeyToNode(i).id().equals(g.cluster().localNode().id())) + return i; + } + + throw new IllegalArgumentException("Can not find key being primary for node: " + g.cluster().localNode().id()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedFailoverSelfTest.java new file mode 100644 index 0000000..18d185c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedFailoverSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Failover tests for colocated cache. + */ +public class GridCacheColocatedFailoverSelfTest extends GridCacheAbstractFailoverSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedOptimisticTransactionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedOptimisticTransactionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedOptimisticTransactionSelfTest.java new file mode 100644 index 0000000..69b89fd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedOptimisticTransactionSelfTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test ensuring that values are visible inside OPTIMISTIC transaction in co-located cache. + */ +public class GridCacheColocatedOptimisticTransactionSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** Cache name. */ + private static final String CACHE = "cache"; + + /** Key. */ + private static final Integer KEY = 1; + + /** Value. */ + private static final String VAL = "val"; + + /** Shared IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grids. */ + private static Ignite[] ignites; + + /** Regular caches. */ + private static GridCache<Integer, String>[] caches; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.getTransactionsConfiguration().setTxSerializableEnabled(true); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + CacheConfiguration cc = new CacheConfiguration(); + + cc.setName(CACHE); + cc.setCacheMode(PARTITIONED); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(PARTITIONED_ONLY); + cc.setBackups(1); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setSwapEnabled(true); + cc.setEvictSynchronized(false); + cc.setEvictNearSynchronized(false); + + c.setDiscoverySpi(disco); + c.setCacheConfiguration(cc); + c.setSwapSpaceSpi(new FileSwapSpaceSpi()); + + return c; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void beforeTest() throws Exception { + ignites = new Ignite[GRID_CNT]; + caches = new GridCache[GRID_CNT]; + + for (int i = 0; i < GRID_CNT; i++) { + ignites[i] = startGrid(i); + + caches[i] = ignites[i].cache(CACHE); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + caches = null; + ignites = null; + } + + /** + * Perform test. + * + * @throws Exception If failed. + */ + public void testOptimisticTransaction() throws Exception { + for (GridCache<Integer, String> cache : caches) { + IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ); + + try { + cache.put(KEY, VAL); + + tx.commit(); + } + finally { + tx.close(); + } + + for (GridCache<Integer, String> cacheInner : caches) { + tx = cacheInner.txStart(OPTIMISTIC, REPEATABLE_READ); + + try { + assert F.eq(VAL, cacheInner.get(KEY)); + + tx.commit(); + } + finally { + tx.close(); + } + } + + tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ); + + try { + cache.remove(KEY); + + tx.commit(); + } + finally { + tx.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedPreloadRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedPreloadRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedPreloadRestartSelfTest.java new file mode 100644 index 0000000..6cea2c7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedPreloadRestartSelfTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.gridgain.grid.kernal.processors.cache.distributed.*; + +/** + * Colocated preload restart test. + */ +public class GridCacheColocatedPreloadRestartSelfTest extends GridCachePreloadRestartAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean nearEnabled() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedPrimarySyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedPrimarySyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedPrimarySyncSelfTest.java new file mode 100644 index 0000000..8cbe4d3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedPrimarySyncSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; + +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Test ensuring that PRIMARY_SYNC mode works correctly for co-located cache. + */ +public class GridCacheColocatedPrimarySyncSelfTest extends GridCacheAbstractPrimarySyncSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedReloadAllSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedReloadAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedReloadAllSelfTest.java new file mode 100644 index 0000000..1fa0e51 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedReloadAllSelfTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.gridgain.grid.kernal.processors.cache.distributed.*; + +/** + * Tests reloadAll for colocated cache. + */ +public class GridCacheColocatedReloadAllSelfTest extends GridCachePartitionedReloadAllAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean nearEnabled() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxExceptionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxExceptionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxExceptionSelfTest.java new file mode 100644 index 0000000..0cbb98e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxExceptionSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Tests colocated cache. + */ +public class GridCacheColocatedTxExceptionSelfTest extends IgniteTxExceptionAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java new file mode 100644 index 0000000..325d8be --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Test pessimistic tx failures in colocated cache. + */ +public class GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest extends + IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected Collection<Class<?>> ignoreMessageClasses() { + return F.asList((Class<?>)GridNearTxFinishRequest.class, GridDhtTxFinishRequest.class); + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxSingleThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxSingleThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxSingleThreadedSelfTest.java new file mode 100644 index 0000000..f609cd0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheColocatedTxSingleThreadedSelfTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.log4j.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * Test txs in single-threaded mode for colocated cache. + */ +public class GridCacheColocatedTxSingleThreadedSelfTest extends IgniteTxSingleThreadedAbstractTest { + /** Cache debug flag. */ + private static final boolean CACHE_DEBUG = false; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.getTransactionsConfiguration().setTxSerializableEnabled(true); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + cc.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); + cc.setAtomicityMode(TRANSACTIONAL); + + cc.setEvictionPolicy(null); + + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_ASYNC); + + cc.setPreloadMode(NONE); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + spi.setMaxMissedHeartbeats(Integer.MAX_VALUE); + + c.setDiscoverySpi(spi); + + c.setCacheConfiguration(cc); + + if (CACHE_DEBUG) + resetLog4j(Level.DEBUG, false, GridCacheProcessor.class.getPackage().getName()); + + return c; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected int keyCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected int maxKeyValue() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected int iterations() { + return 3000; + } + + /** {@inheritDoc} */ + @Override protected boolean isTestDebug() { + return false; + } + + /** {@inheritDoc} */ + @Override protected boolean printMemoryStats() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDaemonNodePartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDaemonNodePartitionedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDaemonNodePartitionedSelfTest.java new file mode 100644 index 0000000..e7b8804 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDaemonNodePartitionedSelfTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; + +/** + * Tests partitioned cache with daemon node. + */ +public class GridCacheDaemonNodePartitionedSelfTest extends GridCacheDaemonNodeAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return GridCacheMode.PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtAtomicEvictionNearReadersSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtAtomicEvictionNearReadersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtAtomicEvictionNearReadersSelfTest.java new file mode 100644 index 0000000..b8a0d98 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtAtomicEvictionNearReadersSelfTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; + +/** + * Test for atomic cache. + */ +public class GridCacheDhtAtomicEvictionNearReadersSelfTest extends GridCacheDhtEvictionNearReadersSelfTest { + /** {@inheritDoc} */ + @Override public GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtAtomicRemoveFailureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtAtomicRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtAtomicRemoveFailureTest.java new file mode 100644 index 0000000..267408b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtAtomicRemoveFailureTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Tests that removes are not lost when topology changes. + */ +public class GridCacheDhtAtomicRemoveFailureTest extends GridCacheAbstractRemoveFailureTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setDistributionMode(PARTITIONED_ONLY); + cfg.setBackups(1); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java new file mode 100644 index 0000000..e59e1e3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Unit tests for dht entry. + */ +public class GridCacheDhtEntrySelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 2; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 10)); + cacheCfg.setBackups(0); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setSwapEnabled(false); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(NEAR_PARTITIONED); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"SizeReplaceableByIsEmpty"}) + @Override protected void beforeTest() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + assert near(grid(i)).size() == 0 : "Near cache size is not zero for grid: " + i; + assert dht(grid(i)).size() == 0 : "DHT cache size is not zero for grid: " + i; + + assert near(grid(i)).isEmpty() : "Near cache is not empty for grid: " + i; + assert dht(grid(i)).isEmpty() : "DHT cache is not empty for grid: " + i; + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"SizeReplaceableByIsEmpty"}) + @Override protected void afterTest() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + near(grid(i)).removeAll(F.<GridCacheEntry<Integer, String>>alwaysTrue()); + + assertEquals("Near cache size is not zero for grid: " + i, 0, near(grid(i)).size()); + assertEquals("DHT cache size is not zero for grid: " + i, 0, dht(grid(i)).size()); + + assert near(grid(i)).isEmpty() : "Near cache is not empty for grid: " + i; + assert dht(grid(i)).isEmpty() : "DHT cache is not empty for grid: " + i; + } + + for (int i = 0; i < GRID_CNT; i++) { + IgniteTx tx = grid(i).cache(null).tx(); + + if (tx != null) + tx.close(); + } + } + + /** + * @param g Grid. + * @return Near cache. + */ + private GridCacheProjection<Integer, String> near(Ignite g) { + return g.cache(null); + } + + /** + * @param g Grid. + * @return Dht cache. + */ + @SuppressWarnings({"unchecked", "TypeMayBeWeakened"}) + private GridDhtCacheAdapter<Integer, String> dht(Ignite g) { + return ((GridNearCacheAdapter)((GridKernal)g).internalCache()).dht(); + } + + /** + * @param nodeId Node ID. + * @return Grid. + */ + private Ignite grid(UUID nodeId) { + return G.ignite(nodeId); + } + + /** @throws Exception If failed. */ + public void testClearWithReaders() throws Exception { + Integer key = 1; + + IgniteBiTuple<ClusterNode, ClusterNode> t = getNodes(key); + + ClusterNode primary = t.get1(); + ClusterNode other = t.get2(); + + GridCacheProjection<Integer, String> near0 = near(grid(primary.id())); + GridCacheProjection<Integer, String> near1 = near(grid(other.id())); + + assert near0 != near1; + + GridDhtCacheAdapter<Integer, String> dht0 = dht(grid(primary.id())); + GridDhtCacheAdapter<Integer, String> dht1 = dht(grid(other.id())); + + // Put on primary node. + String val = "v1"; + + near0.put(key, val); + + GridDhtCacheEntry<Integer, String> e0 = (GridDhtCacheEntry<Integer, String>)dht0.peekEx(key); + GridDhtCacheEntry<Integer, String> e1 = (GridDhtCacheEntry<Integer, String>)dht1.peekEx(key); + + assert e0 == null || e0.readers().isEmpty(); + assert e1 == null || e1.readers().isEmpty(); + + // Get value on other node. + assertEquals(val, near1.get(key)); + + assert e0 != null; + + assert e0.readers().contains(other.id()); + assert e1 == null || e1.readers().isEmpty(); + + assert !near0.clear(key); + + assertEquals(1, near0.size()); + assertEquals(1, dht0.size()); + + assertEquals(1, near1.size()); + assertEquals(0, dht1.size()); + } + + /** @throws Exception If failed. */ + public void testRemoveWithReaders() throws Exception { + Integer key = 1; + + IgniteBiTuple<ClusterNode, ClusterNode> t = getNodes(key); + + ClusterNode primary = t.get1(); + ClusterNode other = t.get2(); + + GridCacheProjection<Integer, String> near0 = near(grid(primary.id())); + GridCacheProjection<Integer, String> near1 = near(grid(other.id())); + + assert near0 != near1; + + GridDhtCacheAdapter<Integer, String> dht0 = dht(grid(primary.id())); + GridDhtCacheAdapter<Integer, String> dht1 = dht(grid(other.id())); + + // Put on primary node. + String val = "v1"; + + near0.put(key, val); + + GridDhtCacheEntry<Integer, String> e0 = (GridDhtCacheEntry<Integer, String>)dht0.peekEx(key); + GridDhtCacheEntry<Integer, String> e1 = (GridDhtCacheEntry<Integer, String>)dht1.peekEx(key); + + assert e0 == null || e0.readers().isEmpty(); + assert e1 == null || e1.readers().isEmpty(); + + // Get value on other node. + assertEquals(val, near1.get(key)); + + assert e0 != null; + + assert e0.readers().contains(other.id()); + assert e1 == null || e1.readers().isEmpty(); + + assert near0.removex(key); + + assertEquals(0, near0.size()); + assertEquals(0, dht0.size()); + + assertEquals(0, near1.size()); + assertEquals(0, dht1.size()); + } + + /** @throws Exception If failed. */ + @SuppressWarnings({"AssertWithSideEffects"}) + public void testEvictWithReaders() throws Exception { + Integer key = 1; + + IgniteBiTuple<ClusterNode, ClusterNode> t = getNodes(key); + + ClusterNode primary = t.get1(); + ClusterNode other = t.get2(); + + GridCacheProjection<Integer, String> near0 = near(grid(primary.id())); + GridCacheProjection<Integer, String> near1 = near(grid(other.id())); + + assert near0 != near1; + + GridDhtCacheAdapter<Integer, String> dht0 = dht(grid(primary.id())); + GridDhtCacheAdapter<Integer, String> dht1 = dht(grid(other.id())); + + // Put on primary node. + String val = "v1"; + + near0.put(key, val); + + GridDhtCacheEntry<Integer, String> e0 = (GridDhtCacheEntry<Integer, String>)dht0.peekEx(key); + GridDhtCacheEntry<Integer, String> e1 = (GridDhtCacheEntry<Integer, String>)dht1.peekEx(key); + + assert e0 == null || e0.readers().isEmpty(); + assert e1 == null || e1.readers().isEmpty(); + + // Get value on other node. + assertEquals(val, near1.get(key)); + + assert e0 != null; + + assert e0.readers().contains(other.id()); + assert e1 == null || e1.readers().isEmpty(); + + assert !e0.evictInternal(false, dht0.context().versions().next(), null); + + assertEquals(1, near0.size()); + assertEquals(1, dht0.size()); + + assertEquals(1, near1.size()); + assertEquals(0, dht1.size()); + + assert !e0.evictInternal(true, dht0.context().versions().next(), null); + + assertEquals(1, near0.size()); + assertEquals(1, dht0.size()); + + assertEquals(1, near1.size()); + assertEquals(0, dht1.size()); + } + + /** + * @param key Key. + * @return For the given key pair {primary node, some other node}. + */ + private IgniteBiTuple<ClusterNode, ClusterNode> getNodes(Integer key) { + GridCacheAffinity<Integer> aff = grid(0).<Integer, Object>cache(null).affinity(); + + int part = aff.partition(key); + + ClusterNode primary = aff.mapPartitionToNode(part); + + assert primary != null; + + Collection<ClusterNode> nodes = new ArrayList<>(grid(0).nodes()); + + nodes.remove(primary); + + ClusterNode other = F.first(nodes); + + assert other != null; + + assert !F.eqNodes(primary, other); + + return F.t(primary, other); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySetSelfTest.java new file mode 100644 index 0000000..595ac32 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySetSelfTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * + */ +public class GridCacheDhtEntrySetSelfTest extends GridCacheEntrySetAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setDistributionMode(PARTITIONED_ONLY); + + cfg.setBackups(gridCount()); + + return cfg; + } +}