http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java deleted file mode 100644 index bd2d0b6..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import javax.cache.configuration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Basic store test. - */ -public abstract class GridCacheWriteBehindStoreAbstractTest extends GridCommonAbstractTest { - /** Flush frequency. */ - private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000; - - /** Cache store. */ - private static final GridCacheTestStore store = new GridCacheTestStore(); - - /** - * - */ - protected GridCacheWriteBehindStoreAbstractTest() { - super(true /*start grid. */); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - store.resetTimestamp(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - GridCache<?, ?> cache = cache(); - - if (cache != null) - cache.clearAll(); - - store.reset(); - } - - /** @return Caching mode. */ - protected abstract GridCacheMode cacheMode(); - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected final IgniteConfiguration getConfiguration() throws Exception { - IgniteConfiguration c = super.getConfiguration(); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - c.setDiscoverySpi(disco); - - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(cacheMode()); - cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); - cc.setSwapEnabled(false); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(NEAR_PARTITIONED); - - cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - cc.setLoadPreviousValue(true); - - cc.setWriteBehindEnabled(true); - cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY); - - c.setCacheConfiguration(cc); - - return c; - } - - /** @throws Exception If test fails. */ - public void testWriteThrough() throws Exception { - GridCache<Integer, String> cache = cache(); - - Map<Integer, String> map = store.getMap(); - - assert map.isEmpty(); - - IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ); - - try { - for (int i = 1; i <= 10; i++) { - cache.putx(i, Integer.toString(i)); - - checkLastMethod(null); - } - - tx.commit(); - } - finally { - tx.close(); - } - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - checkLastMethod("putAll"); - - assert cache.size() == 10; - - for (int i = 1; i <= 10; i++) { - String val = map.get(i); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - store.resetLastMethod(); - - tx = cache.txStart(); - - try { - for (int i = 1; i <= 10; i++) { - String val = cache.remove(i); - - checkLastMethod(null); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - tx.commit(); - } - finally { - tx.close(); - } - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - checkLastMethod("removeAll"); - - assert map.isEmpty(); - } - - /** @throws Exception If test failed. */ - public void testReadThrough() throws Exception { - GridCache<Integer, String> cache = cache(); - - Map<Integer, String> map = store.getMap(); - - assert map.isEmpty(); - - try (IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ)) { - for (int i = 1; i <= 10; i++) - cache.putx(i, Integer.toString(i)); - - checkLastMethod(null); - - tx.commit(); - } - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - checkLastMethod("putAll"); - - for (int i = 1; i <= 10; i++) { - String val = map.get(i); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - cache.clearAll(); - - assert cache.isEmpty(); - assert cache.isEmpty(); - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - assert map.size() == 10; - - for (int i = 1; i <= 10; i++) { - // Read through. - String val = cache.get(i); - - checkLastMethod("load"); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - assert cache.size() == 10; - - cache.clearAll(); - - assert cache.isEmpty(); - assert cache.isEmpty(); - - assert map.size() == 10; - - Collection<Integer> keys = new ArrayList<>(); - - for (int i = 1; i <= 10; i++) - keys.add(i); - - // Read through. - Map<Integer, String> vals = cache.getAll(keys); - - checkLastMethod("loadAll"); - - assert vals != null; - assert vals.size() == 10; - - for (int i = 1; i <= 10; i++) { - String val = vals.get(i); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - // Write through. - cache.removeAll(keys); - - // Need to wait WFB flush timeout. - U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); - - checkLastMethod("removeAll"); - - assert cache.isEmpty(); - assert cache.isEmpty(); - - assert map.isEmpty(); - } - - /** @throws Exception If failed. */ - public void testMultithreaded() throws Exception { - final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>(); - - final AtomicBoolean running = new AtomicBoolean(true); - - final GridCache<Integer, String> cache = cache(); - - IgniteFuture<?> fut = multithreadedAsync(new Runnable() { - @SuppressWarnings({"NullableProblems"}) - @Override public void run() { - // Initialize key set for this thread. - Set<Integer> set = new HashSet<>(); - - Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set); - - if (old != null) - set = old; - - Random rnd = new Random(); - - try { - int keyCnt = 20000; - - while (running.get()) { - int op = rnd.nextInt(2); - int key = rnd.nextInt(keyCnt); - - switch (op) { - case 0: - cache.put(key, "val" + key); - set.add(key); - - break; - - case 1: - default: - cache.remove(key); - set.remove(key); - - break; - } - } - } - catch (IgniteCheckedException e) { - error("Unexpected exception in put thread", e); - - assert false; - } - } - }, 10, "put"); - - U.sleep(10000); - - running.set(false); - - fut.get(); - - U.sleep(5 * WRITE_FROM_BEHIND_FLUSH_FREQUENCY); - - Map<Integer, String> stored = store.getMap(); - - for (Map.Entry<Integer, String> entry : stored.entrySet()) { - int key = entry.getKey(); - - assertEquals("Invalid value for key " + key, "val" + key, entry.getValue()); - - boolean found = false; - - for (Set<Integer> threadPuts : perThread.values()) { - if (threadPuts.contains(key)) { - found = true; - - break; - } - } - - assert found : "No threads found that put key " + key; - } - } - - /** @param mtd Expected last method value. */ - private void checkLastMethod(@Nullable String mtd) { - String lastMtd = store.getLastMethod(); - - if (mtd == null) - assert lastMtd == null : "Last method must be null: " + lastMtd; - else { - assert lastMtd != null : "Last method must be not null"; - assert lastMtd.equals(mtd) : "Last method does not match [expected=" + mtd + ", lastMtd=" + lastMtd + ']'; - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreLocalTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreLocalTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreLocalTest.java deleted file mode 100644 index bfe5103..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreLocalTest.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.cache.*; -import org.apache.ignite.internal.processors.cache.*; - -/** - * Tests {@link GridCacheWriteBehindStore} in grid configuration. - */ -public class GridCacheWriteBehindStoreLocalTest extends GridCacheWriteBehindStoreAbstractTest { - /** {@inheritDoc} */ - @Override protected GridCacheMode cacheMode() { - return GridCacheMode.LOCAL; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java deleted file mode 100644 index 709ba67..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Multithreaded tests for {@link GridCacheWriteBehindStore}. - */ -public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { - /** - * This test performs complex set of operations on store from multiple threads. - * - * @throws Exception If failed. - */ - public void testPutGetRemove() throws Exception { - initStore(2); - - Set<Integer> exp; - - try { - exp = runPutGetRemoveMultithreaded(10, 10); - } - finally { - shutdownStore(); - } - - Map<Integer, String> map = delegate.getMap(); - - Collection<Integer> extra = new HashSet<>(map.keySet()); - - extra.removeAll(exp); - - assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); - - Collection<Integer> missing = new HashSet<>(exp); - - missing.removeAll(map.keySet()); - - assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); - - for (Integer key : exp) - assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); - } - - /** - * Tests that cache would keep values if underlying store fails. - * - * @throws Exception If failed. - */ - public void testStoreFailure() throws Exception { - delegate.setShouldFail(true); - - initStore(2); - - Set<Integer> exp; - - try { - exp = runPutGetRemoveMultithreaded(10, 10); - - U.sleep(FLUSH_FREQUENCY); - - info(">>> There are " + store.getWriteBehindErrorRetryCount() + " entries in RETRY state"); - - delegate.setShouldFail(false); - - // Despite that we set shouldFail flag to false, flush thread may just have caught an exception. - // If we move store to the stopping state right away, this value will be lost. That's why this sleep - // is inserted here to let all exception handlers in write-behind store exit. - U.sleep(1000); - } - finally { - shutdownStore(); - } - - Map<Integer, String> map = delegate.getMap(); - - Collection<Integer> extra = new HashSet<>(map.keySet()); - - extra.removeAll(exp); - - assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); - - Collection<Integer> missing = new HashSet<>(exp); - - missing.removeAll(map.keySet()); - - assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); - - for (Integer key : exp) - assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); - } - - /** - * Tests store consistency in case of high put rate, when flush is performed from the same thread - * as put or remove operation. - * - * @throws Exception If failed. - */ - public void testFlushFromTheSameThread() throws Exception { - // 50 milliseconds should be enough. - delegate.setOperationDelay(50); - - initStore(2); - - Set<Integer> exp; - - int start = store.getWriteBehindTotalCriticalOverflowCount(); - - try { - //We will have in total 5 * CACHE_SIZE keys that should be enough to grow map size to critical value. - exp = runPutGetRemoveMultithreaded(5, CACHE_SIZE); - } - finally { - log.info(">>> Done inserting, shutting down the store"); - - shutdownStore(); - } - - // Restore delay. - delegate.setOperationDelay(0); - - Map<Integer, String> map = delegate.getMap(); - - int end = store.getWriteBehindTotalCriticalOverflowCount(); - - log.info(">>> There are " + exp.size() + " keys in store, " + (end - start) + " overflows detected"); - - assertTrue("No cache overflows detected (a bug or too few keys or too few delay?)", end > start); - - Collection<Integer> extra = new HashSet<>(map.keySet()); - - extra.removeAll(exp); - - assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); - - Collection<Integer> missing = new HashSet<>(exp); - - missing.removeAll(map.keySet()); - - assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); - - for (Integer key : exp) - assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java deleted file mode 100644 index 5087fc1..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; - -import javax.cache.configuration.*; -import java.util.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Tests write-behind store with near and dht commit option. - */ -public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends GridCommonAbstractTest { - /** Grids to start. */ - private static final int GRID_CNT = 5; - - /** Ip finder. */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Flush frequency. */ - public static final int WRITE_BEHIND_FLUSH_FREQ = 1000; - - /** Stores per grid. */ - private GridCacheTestStore[] stores = new GridCacheTestStore[GRID_CNT]; - - /** Start grid counter. */ - private int idx; - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - c.setDiscoverySpi(disco); - - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(GridCacheMode.PARTITIONED); - cc.setWriteBehindEnabled(true); - cc.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQ); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(NEAR_PARTITIONED); - - CacheStore store = stores[idx] = new GridCacheTestStore(); - - cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - cc.setLoadPreviousValue(true); - - c.setCacheConfiguration(cc); - - idx++; - - return c; - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stores = null; - - super.afterTestsStopped(); - } - - /** - * @throws Exception If failed. - */ - private void prepare() throws Exception { - idx = 0; - - startGrids(GRID_CNT); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testSingleWritesOnDhtNode() throws Exception { - checkSingleWrites(); - } - - /** - * @throws Exception If failed. - */ - public void testBatchWritesOnDhtNode() throws Exception { - checkBatchWrites(); - } - - /** - * @throws Exception If failed. - */ - public void testTxWritesOnDhtNode() throws Exception { - checkTxWrites(); - } - - /** - * @throws Exception If failed. - */ - private void checkSingleWrites() throws Exception { - prepare(); - - GridCache<Integer, String> cache = grid(0).cache(null); - - for (int i = 0; i < 100; i++) - cache.put(i, String.valueOf(i)); - - checkWrites(); - } - - /** - * @throws Exception If failed. - */ - private void checkBatchWrites() throws Exception { - prepare(); - - Map<Integer, String> map = new HashMap<>(); - - for (int i = 0; i < 100; i++) - map.put(i, String.valueOf(i)); - - grid(0).cache(null).putAll(map); - - checkWrites(); - } - - /** - * @throws Exception If failed. - */ - private void checkTxWrites() throws Exception { - prepare(); - - GridCache<Object, Object> cache = grid(0).cache(null); - - try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - for (int i = 0; i < 100; i++) - cache.put(i, String.valueOf(i)); - - tx.commit(); - } - - checkWrites(); - } - - /** - * @throws IgniteInterruptedException If sleep was interrupted. - */ - private void checkWrites() throws IgniteInterruptedException { - U.sleep(WRITE_BEHIND_FLUSH_FREQ * 2); - - Collection<Integer> allKeys = new ArrayList<>(100); - - for (int i = 0; i < GRID_CNT; i++) { - Map<Integer,String> map = stores[i].getMap(); - - assertFalse("Missing writes for node: " + i, map.isEmpty()); - - allKeys.addAll(map.keySet()); - - // Check there is no intersection. - for (int j = 0; j < GRID_CNT; j++) { - if (i == j) - continue; - - Collection<Integer> intersection = new HashSet<>(stores[j].getMap().keySet()); - - intersection.retainAll(map.keySet()); - - assertTrue(intersection.isEmpty()); - } - } - - assertEquals(100, allKeys.size()); - - for (int i = 0; i < 100; i++) - assertTrue(allKeys.contains(i)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java deleted file mode 100644 index 12ca535..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.cache.*; -import org.apache.ignite.internal.processors.cache.*; - -/** - * Tests {@link GridCacheWriteBehindStore} in partitioned configuration. - */ -public class GridCacheWriteBehindStorePartitionedTest extends GridCacheWriteBehindStoreAbstractTest { - /** {@inheritDoc} */ - @Override protected GridCacheMode cacheMode() { - return GridCacheMode.PARTITIONED; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java deleted file mode 100644 index f28d719..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.cache.*; -import org.apache.ignite.internal.processors.cache.*; - -/** - * Tests {@link GridCacheWriteBehindStore} in grid configuration. - */ -public class GridCacheWriteBehindStoreReplicatedTest extends GridCacheWriteBehindStoreAbstractTest { - /** {@inheritDoc} */ - @Override protected GridCacheMode cacheMode() { - return GridCacheMode.REPLICATED; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreSelfTest.java deleted file mode 100644 index 42c8d17..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStoreSelfTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * This class provides basic tests for {@link GridCacheWriteBehindStore}. - */ -public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { - /** - * Tests correct store shutdown when underlying store fails, - * - * @throws Exception If failed. - */ - public void testShutdownWithFailure() throws Exception { - final AtomicReference<Exception> err = new AtomicReference<>(); - - multithreadedAsync(new Runnable() { - @Override public void run() { - try { - delegate.setShouldFail(true); - - initStore(2); - - try { - store.write(new CacheEntryImpl<>(1, "val1")); - store.write(new CacheEntryImpl<>(2, "val2")); - } - finally { - shutdownStore(); - - delegate.setShouldFail(false); - } - } - catch (Exception e) { - err.set(e); - } - } - }, 1).get(); - - if (err.get() != null) - throw err.get(); - } - - /** - * @throws Exception If failed. - */ - public void testSimpleStore() throws Exception { - initStore(2); - - try { - store.write(new CacheEntryImpl<>(1, "v1")); - store.write(new CacheEntryImpl<>(2, "v2")); - - assertEquals("v1", store.load(1)); - assertEquals("v2", store.load(2)); - assertNull(store.load(3)); - - store.delete(1); - - assertNull(store.load(1)); - assertEquals("v2", store.load(2)); - assertNull(store.load(3)); - } - finally { - shutdownStore(); - } - } - - /** - * Check that all values written to the store will be in underlying store after timeout or due to size limits. - * - * @throws Exception If failed. - */ - @SuppressWarnings({"NullableProblems"}) - public void testValuePropagation() throws Exception { - // Need to test size-based write. - initStore(1); - - try { - for (int i = 0; i < CACHE_SIZE * 2; i++) - store.write(new CacheEntryImpl<>(i, "val" + i)); - - U.sleep(200); - - for (int i = 0; i < CACHE_SIZE; i++) { - String val = delegate.load(i); - - assertNotNull("Value for [key= " + i + "] was not written in store", val); - assertEquals("Invalid value [key=" + i + "]", "val" + i, val); - } - - U.sleep(FLUSH_FREQUENCY + 300); - - for (int i = CACHE_SIZE; i < CACHE_SIZE * 2; i++) { - String val = delegate.load(i); - - assertNotNull("Value for [key= " + i + "] was not written in store", val); - assertEquals("Invalid value [key=" + i + "]", "val" + i, val); - } - } - finally { - shutdownStore(); - } - } - - /** - * Tests store behaviour under continuous put of the same key with different values. - * - * @throws Exception If failed - */ - public void testContinuousPut() throws Exception { - initStore(2); - - try { - final AtomicBoolean running = new AtomicBoolean(true); - - final AtomicInteger actualPutCnt = new AtomicInteger(); - - IgniteFuture<?> fut = multithreadedAsync(new Runnable() { - @SuppressWarnings({"NullableProblems"}) - @Override public void run() { - try { - while (running.get()) { - for (int i = 0; i < CACHE_SIZE; i++) { - store.write(new CacheEntryImpl<>(i, "val-0")); - - actualPutCnt.incrementAndGet(); - - store.write(new CacheEntryImpl<>(i, "val" + i)); - - actualPutCnt.incrementAndGet(); - } - } - } - catch (Exception e) { - error("Unexpected exception in put thread", e); - - assert false; - } - } - }, 1, "put"); - - U.sleep(FLUSH_FREQUENCY * 2 + 500); - - int delegatePutCnt = delegate.getPutAllCount(); - - running.set(false); - - fut.get(); - - log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]"); - - assertTrue("No puts were made to the underlying store", delegatePutCnt > 0); - assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10); - } - finally { - shutdownStore(); - } - - // These checks must be done after the store shut down - assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size()); - - for (int i = 0; i < CACHE_SIZE; i++) - assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i)); - } - - /** - * Tests that all values were put into the store will be written to the underlying store - * after shutdown is called. - * - * @throws Exception If failed. - */ - public void testShutdown() throws Exception { - initStore(2); - - try { - final AtomicBoolean running = new AtomicBoolean(true); - - IgniteFuture<?> fut = multithreadedAsync(new Runnable() { - @SuppressWarnings({"NullableProblems"}) - @Override public void run() { - try { - while (running.get()) { - for (int i = 0; i < CACHE_SIZE; i++) { - store.write(new CacheEntryImpl<>(i, "val-0")); - - store.write(new CacheEntryImpl<>(i, "val" + i)); - } - } - } - catch (Exception e) { - error("Unexpected exception in put thread", e); - - assert false; - } - } - }, 1, "put"); - - U.sleep(300); - - running.set(false); - - fut.get(); - } - finally { - shutdownStore(); - } - - // These checks must be done after the store shut down - assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size()); - - for (int i = 0; i < CACHE_SIZE; i++) - assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i)); - } - - /** - * Tests that all values will be written to the underlying store - * right in the same order as they were put into the store. - * - * @throws Exception If failed. - */ - public void testBatchApply() throws Exception { - delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, String>()); - - initStore(1); - - List<Integer> intList = new ArrayList<>(CACHE_SIZE); - - try { - for (int i = 0; i < CACHE_SIZE; i++) { - store.write(new CacheEntryImpl<>(i, "val" + i)); - - intList.add(i); - } - } - finally { - shutdownStore(); - } - - Map<Integer, String> underlyingMap = delegate.getMap(); - - assertTrue("Store map key set: " + underlyingMap.keySet(), F.eqOrdered(underlyingMap.keySet(), intList)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java deleted file mode 100644 index 94f1b24..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Tests for local transactions. - */ -@SuppressWarnings( {"BusyWait"}) -abstract class IgniteTxAbstractTest extends GridCommonAbstractTest { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Execution count. */ - private static final AtomicInteger cntr = new AtomicInteger(); - - /** */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** - * Start grid by default. - */ - protected IgniteTxAbstractTest() { - super(false /*start grid. */); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - c.setDiscoverySpi(disco); - - return c; - } - - /** - * @return Grid count. - */ - protected abstract int gridCount(); - - /** - * @return Key count. - */ - protected abstract int keyCount(); - - /** - * @return Maximum key value. - */ - protected abstract int maxKeyValue(); - - /** - * @return Thread iterations. - */ - protected abstract int iterations(); - - /** - * @return True if in-test logging is enabled. - */ - protected abstract boolean isTestDebug(); - - /** - * @return {@code True} if memory stats should be printed. - */ - protected abstract boolean printMemoryStats(); - - /** {@inheritDoc} */ - private void debug(String msg) { - if (isTestDebug()) - info(msg); - } - - /** - * @throws Exception If failed. - */ - @Override protected void beforeTestsStarted() throws Exception { - for (int i = 0; i < gridCount(); i++) - startGrid(i); - } - - /** - * @throws Exception If failed. - */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** - * @param i Grid index. - * @return Cache. - */ - @SuppressWarnings("unchecked") - @Override protected GridCache<Integer, String> cache(int i) { - return grid(i).cache(null); - } - - /** - * @return Keys. - */ - protected Iterable<Integer> getKeys() { - List<Integer> keys = new ArrayList<>(keyCount()); - - for (int i = 0; i < keyCount(); i++) - keys.add(RAND.nextInt(maxKeyValue()) + 1); - - Collections.sort(keys); - - return Collections.unmodifiableList(keys); - } - - /** - * @return Random cache operation. - */ - protected OP getOp() { - switch (RAND.nextInt(3)) { - case 0: { return OP.READ; } - case 1: { return OP.WRITE; } - case 2: { return OP.REMOVE; } - - // Should never be reached. - default: { assert false; return null; } - } - } - - /** - * @param concurrency Concurrency. - * @param isolation Isolation. - * @throws Exception If check failed. - */ - protected void checkCommit(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { - int gridIdx = RAND.nextInt(gridCount()); - - Ignite ignite = grid(gridIdx); - - if (isTestDebug()) - debug("Checking commit on grid: " + ignite.cluster().localNode().id()); - - for (int i = 0; i < iterations(); i++) { - GridCache<Integer, String> cache = cache(gridIdx); - - IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0); - - try { - int prevKey = -1; - - for (Integer key : getKeys()) { - // Make sure we have the same locking order for all concurrent transactions. - assert key >= prevKey : "key: " + key + ", prevKey: " + prevKey; - - if (isTestDebug()) { - GridCacheAffinityFunction aff = cache.configuration().getAffinity(); - - int part = aff.partition(key); - - debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + - U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); - } - - String val = Integer.toString(key); - - switch (getOp()) { - case READ: { - if (isTestDebug()) - debug("Reading key [key=" + key + ", i=" + i + ']'); - - val = cache.get(key); - - if (isTestDebug()) - debug("Read value for key [key=" + key + ", val=" + val + ']'); - - break; - } - - case WRITE: { - if (isTestDebug()) - debug("Writing key and value [key=" + key + ", val=" + val + ", i=" + i + ']'); - - cache.put(key, val); - - break; - } - - case REMOVE: { - if (isTestDebug()) - debug("Removing key [key=" + key + ", i=" + i + ']'); - - cache.remove(key); - - break; - } - - default: { assert false; } - } - } - - tx.commit(); - - if (isTestDebug()) - debug("Committed transaction [i=" + i + ", tx=" + tx + ']'); - } - catch (IgniteTxOptimisticException e) { - if (concurrency != OPTIMISTIC || isolation != SERIALIZABLE) { - error("Received invalid optimistic failure.", e); - - throw e; - } - - if (isTestDebug()) - info("Optimistic transaction failure (will rollback) [i=" + i + ", msg=" + e.getMessage() + - ", tx=" + tx.xid() + ']'); - - try { - tx.rollback(); - } - catch (IgniteCheckedException ex) { - error("Failed to rollback optimistic failure: " + tx, ex); - - throw ex; - } - } - catch (Exception e) { - error("Transaction failed (will rollback): " + tx, e); - - tx.rollback(); - - throw e; - } - catch (Error e) { - error("Error when executing transaction (will rollback): " + tx, e); - - tx.rollback(); - - throw e; - } - finally { - IgniteTx t = cache.tx(); - - assert t == null : "Thread should not have transaction upon completion ['t==tx'=" + (t == tx) + - ", t=" + t + (t != tx ? "tx=" + tx : "tx=''") + ']'; - } - } - - if (printMemoryStats()) { - if (cntr.getAndIncrement() % 100 == 0) - // Print transaction memory stats. - ((GridKernal)grid(gridIdx)).internalCache().context().tm().printMemoryStats(); - } - } - - /** - * @param concurrency Concurrency. - * @param isolation Isolation. - * @throws IgniteCheckedException If check failed. - */ - protected void checkRollback(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) - throws Exception { - checkRollback(new ConcurrentHashMap<Integer, String>(), concurrency, isolation); - } - - /** - * @param map Map to check. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @throws IgniteCheckedException If check failed. - */ - protected void checkRollback(ConcurrentMap<Integer, String> map, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation) throws Exception { - int gridIdx = RAND.nextInt(gridCount()); - - Ignite ignite = grid(gridIdx); - - if (isTestDebug()) - debug("Checking commit on grid: " + ignite.cluster().localNode().id()); - - for (int i = 0; i < iterations(); i++) { - GridCache<Integer, String> cache = cache(gridIdx); - - IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0); - - try { - for (Integer key : getKeys()) { - if (isTestDebug()) { - GridCacheAffinityFunction aff = cache.configuration().getAffinity(); - - int part = aff.partition(key); - - debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + - U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); - } - - String val = Integer.toString(key); - - switch (getOp()) { - case READ: { - debug("Reading key: " + key); - - checkMap(map, key, cache.get(key)); - - break; - } - - case WRITE: { - debug("Writing key and value [key=" + key + ", val=" + val + ']'); - - checkMap(map, key, cache.put(key, val)); - - break; - } - - case REMOVE: { - debug("Removing key: " + key); - - checkMap(map, key, cache.remove(key)); - - break; - } - - default: { assert false; } - } - } - - tx.rollback(); - - debug("Rolled back transaction: " + tx); - } - catch (IgniteTxOptimisticException e) { - tx.rollback(); - - log.warning("Rolled back transaction due to optimistic exception [tx=" + tx + ", e=" + e + ']'); - - throw e; - } - catch (Exception e) { - tx.rollback(); - - error("Rolled back transaction due to exception [tx=" + tx + ", e=" + e + ']'); - - throw e; - } - finally { - IgniteTx t1 = cache.tx(); - - debug("t1=" + t1); - - assert t1 == null : "Thread should not have transaction upon completion ['t==tx'=" + (t1 == tx) + - ", t=" + t1 + ']'; - } - } - } - - /** - * @param map Map to check against. - * @param key Key. - * @param val Value. - */ - private void checkMap(ConcurrentMap<Integer, String> map, Integer key, String val) { - if (val != null) { - String v = map.putIfAbsent(key, val); - - assert v == null || v.equals(val); - } - } - - /** - * Checks integrity of all caches after tests. - * - * @throws IgniteCheckedException If check failed. - */ - @SuppressWarnings({"ErrorNotRethrown"}) - protected void finalChecks() throws Exception { - for (int i = 1; i <= maxKeyValue(); i++) { - for (int k = 0; k < 3; k++) { - try { - GridCacheEntry<Integer, String> e1 = null; - - String v1 = null; - - for (int j = 0; j < gridCount(); j++) { - GridCache<Integer, String> cache = cache(j); - - IgniteTx tx = cache.tx(); - - assertNull("Transaction is not completed: " + tx, tx); - - if (j == 0) { - e1 = cache.entry(i); - - v1 = e1.get(); - } - else { - GridCacheEntry<Integer, String> e2 = cache.entry(i); - - String v2 = e2.get(); - - if (!F.eq(v2, v1)) { - v1 = e1.get(); - v2 = e2.get(); - } - - assert F.eq(v2, v1) : - "Invalid cached value [key=" + i + ", v1=" + v1 + ", v2=" + v2 + ", e1=" + e1 + - ", e2=" + e2 + ", grid=" + j + ']'; - } - } - - break; - } - catch (AssertionError e) { - if (k == 2) - throw e; - else - // Wait for transactions to complete. - Thread.sleep(500); - } - } - } - - for (int i = 1; i <= maxKeyValue(); i++) { - for (int k = 0; k < 3; k++) { - try { - for (int j = 0; j < gridCount(); j++) { - GridCacheProjection<Integer, String> cache = cache(j); - - cache.removeAll(); - -// assert cache.keySet().isEmpty() : "Cache is not empty: " + cache.entrySet(); - } - - break; - } - catch (AssertionError e) { - if (k == 2) - throw e; - else - // Wait for transactions to complete. - Thread.sleep(500); - } - } - } - } - - /** - * Cache operation. - */ - protected enum OP { - /** Cache read. */ - READ, - - /** Cache write. */ - WRITE, - - /** Cache remove. */ - REMOVE - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java deleted file mode 100644 index 16cb9fb..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Checks multithreaded put/get cache operations on one node. - */ -public abstract class IgniteTxConcurrentGetAbstractTest extends GridCommonAbstractTest { - /** Debug flag. */ - private static final boolean DEBUG = false; - - /** */ - protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int THREAD_NUM = 20; - - /** - * Default constructor. - * - */ - protected IgniteTxConcurrentGetAbstractTest() { - super(true /** Start grid. */); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - - return cfg; - } - - /** - * @param g Grid. - * @return Near cache. - */ - GridNearCacheAdapter<String, Integer> near(Ignite g) { - return (GridNearCacheAdapter<String, Integer>)((GridKernal)g).<String, Integer>internalCache(); - } - - /** - * @param g Grid. - * @return DHT cache. - */ - GridDhtCacheAdapter<String, Integer> dht(Ignite g) { - return near(g).dht(); - } - - /** - * JUnit. - * - * @throws Exception If failed. - */ - public void testPutGet() throws Exception { - // Random key. - final String key = UUID.randomUUID().toString(); - - final Ignite ignite = grid(); - - ignite.cache(null).put(key, "val"); - - GridCacheEntryEx<String,Integer> dhtEntry = dht(ignite).peekEx(key); - - if (DEBUG) - info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", entry=" + dhtEntry + ']'); - - String val = txGet(ignite, key); - - assertNotNull(val); - - info("Starting threads: " + THREAD_NUM); - - multithreaded(new Callable<String>() { - @Override public String call() throws Exception { - return txGet(ignite, key); - } - }, THREAD_NUM, "getter-thread"); - } - - /** - * @param ignite Grid. - * @param key Key. - * @return Value. - * @throws Exception If failed. - */ - private String txGet(Ignite ignite, String key) throws Exception { - try (IgniteTx tx = ignite.cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { - GridCacheEntryEx<String, Integer> dhtEntry = dht(ignite).peekEx(key); - - if (DEBUG) - info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", xid=" + tx.xid() + - ", entry=" + dhtEntry + ']'); - - String val = ignite.<String, String>cache(null).get(key); - - assertNotNull(val); - assertEquals("val", val); - - tx.commit(); - - return val; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java deleted file mode 100644 index 65c76ec..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java +++ /dev/null @@ -1,648 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.spi.indexing.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.testframework.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.processor.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.GridCacheMode.*; - -/** - * Tests that transaction is invalidated in case of {@link IgniteTxHeuristicException}. - */ -public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstractSelfTest { - /** Index SPI throwing exception. */ - private static TestIndexingSpi idxSpi = new TestIndexingSpi(); - - /** */ - private static final int PRIMARY = 0; - - /** */ - private static final int BACKUP = 1; - - /** */ - private static final int NOT_PRIMARY_AND_BACKUP = 2; - - /** */ - private static Integer lastKey; - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setIndexingSpi(idxSpi); - - cfg.getTransactionsConfiguration().setTxSerializableEnabled(true); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { - CacheConfiguration ccfg = super.cacheConfiguration(gridName); - - ccfg.setQueryIndexEnabled(true); - ccfg.setCacheStoreFactory(null); - ccfg.setReadThrough(false); - ccfg.setWriteThrough(false); - ccfg.setLoadPreviousValue(true); - - return ccfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - lastKey = 0; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - idxSpi.forceFail(false); - } - - /** - * @throws Exception If failed. - */ - public void testPutNear() throws Exception { - checkPut(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); - - checkPut(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); - } - - /** - * @throws Exception If failed. - */ - public void testPutPrimary() throws Exception { - checkPut(true, keyForNode(grid(0).localNode(), PRIMARY)); - - checkPut(false, keyForNode(grid(0).localNode(), PRIMARY)); - } - - /** - * @throws Exception If failed. - */ - public void testPutBackup() throws Exception { - checkPut(true, keyForNode(grid(0).localNode(), BACKUP)); - - checkPut(false, keyForNode(grid(0).localNode(), BACKUP)); - } - - /** - * @throws Exception If failed. - */ - public void testPutAll() throws Exception { - checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY), - keyForNode(grid(0).localNode(), PRIMARY), - keyForNode(grid(0).localNode(), PRIMARY)); - - checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY), - keyForNode(grid(0).localNode(), PRIMARY), - keyForNode(grid(0).localNode(), PRIMARY)); - - if (gridCount() > 1) { - checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY), - keyForNode(grid(1).localNode(), PRIMARY), - keyForNode(grid(1).localNode(), PRIMARY)); - - checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY), - keyForNode(grid(1).localNode(), PRIMARY), - keyForNode(grid(1).localNode(), PRIMARY)); - } - } - - /** - * @throws Exception If failed. - */ - public void testRemoveNear() throws Exception { - checkRemove(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); - - checkRemove(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); - } - - /** - * @throws Exception If failed. - */ - public void testRemovePrimary() throws Exception { - checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY)); - - checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY)); - } - - /** - * @throws Exception If failed. - */ - public void testRemoveBackup() throws Exception { - checkRemove(false, keyForNode(grid(0).localNode(), BACKUP)); - - checkRemove(true, keyForNode(grid(0).localNode(), BACKUP)); - } - - /** - * @throws Exception If failed. - */ - public void testTransformNear() throws Exception { - checkTransform(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); - - checkTransform(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); - } - - /** - * @throws Exception If failed. - */ - public void testTransformPrimary() throws Exception { - checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY)); - - checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY)); - } - - /** - * @throws Exception If failed. - */ - public void testTransformBackup() throws Exception { - checkTransform(false, keyForNode(grid(0).localNode(), BACKUP)); - - checkTransform(true, keyForNode(grid(0).localNode(), BACKUP)); - } - - /** - * @throws Exception If failed. - */ - public void testPutNearTx() throws Exception { - for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { - for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { - checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); - - checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testPutPrimaryTx() throws Exception { - for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { - for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { - checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); - - checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testPutBackupTx() throws Exception { - for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { - for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { - checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); - - checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testPutMultipleKeysTx() throws Exception { - for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { - for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { - checkPutTx(true, concurrency, isolation, - keyForNode(grid(0).localNode(), PRIMARY), - keyForNode(grid(0).localNode(), PRIMARY), - keyForNode(grid(0).localNode(), PRIMARY)); - - checkPutTx(false, concurrency, isolation, - keyForNode(grid(0).localNode(), PRIMARY), - keyForNode(grid(0).localNode(), PRIMARY), - keyForNode(grid(0).localNode(), PRIMARY)); - - if (gridCount() > 1) { - checkPutTx(true, concurrency, isolation, - keyForNode(grid(1).localNode(), PRIMARY), - keyForNode(grid(1).localNode(), PRIMARY), - keyForNode(grid(1).localNode(), PRIMARY)); - - checkPutTx(false, concurrency, isolation, - keyForNode(grid(1).localNode(), PRIMARY), - keyForNode(grid(1).localNode(), PRIMARY), - keyForNode(grid(1).localNode(), PRIMARY)); - } - } - } - } - - /** - * @param putBefore If {@code true} then puts some value before executing failing operation. - * @param keys Keys. - * @param concurrency Transaction concurrency. - * @param isolation Transaction isolation. - * @throws Exception If failed. - */ - private void checkPutTx(boolean putBefore, IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, final Integer... keys) throws Exception { - assertTrue(keys.length > 0); - - info("Test transaction [concurrency=" + concurrency + ", isolation=" + isolation + ']'); - - GridCache<Integer, Integer> cache = grid(0).cache(null); - - if (putBefore) { - idxSpi.forceFail(false); - - info("Start transaction."); - - try (IgniteTx tx = cache.txStart(concurrency, isolation)) { - for (Integer key : keys) { - info("Put " + key); - - cache.put(key, 1); - } - - info("Commit."); - - tx.commit(); - } - } - - // Execute get from all nodes to create readers for near cache. - for (int i = 0; i < gridCount(); i++) { - for (Integer key : keys) - grid(i).cache(null).get(key); - } - - idxSpi.forceFail(true); - - try { - info("Start transaction."); - - try (IgniteTx tx = cache.txStart(concurrency, isolation)) { - for (Integer key : keys) { - info("Put " + key); - - cache.put(key, 2); - } - - info("Commit."); - - tx.commit(); - } - - fail("Transaction should fail."); - } - catch (IgniteTxHeuristicException e) { - log.info("Expected exception: " + e); - } - - for (Integer key : keys) - checkEmpty(key); - } - - /** - * @param key Key. - * @throws Exception If failed. - */ - private void checkEmpty(final Integer key) throws Exception { - idxSpi.forceFail(false); - - info("Check key: " + key); - - for (int i = 0; i < gridCount(); i++) { - GridKernal grid = (GridKernal) grid(i); - - GridCacheAdapter cache = grid.internalCache(null); - - GridCacheMapEntry entry = cache.map().getEntry(key); - - log.info("Entry: " + entry); - - if (entry != null) { - assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny()); - assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue()); - } - - if (cache.isNear()) { - entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(key); - - log.info("Dht entry: " + entry); - - if (entry != null) { - assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny()); - assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue()); - } - } - } - - for (int i = 0; i < gridCount(); i++) - assertEquals("Unexpected value for grid " + i, null, grid(i).cache(null).get(key)); - } - - /** - * @param putBefore If {@code true} then puts some value before executing failing operation. - * @param key Key. - * @throws Exception If failed. - */ - private void checkPut(boolean putBefore, final Integer key) throws Exception { - if (putBefore) { - idxSpi.forceFail(false); - - info("Put key: " + key); - - grid(0).cache(null).put(key, 1); - } - - // Execute get from all nodes to create readers for near cache. - for (int i = 0; i < gridCount(); i++) - grid(i).cache(null).get(key); - - idxSpi.forceFail(true); - - info("Going to put: " + key); - - GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - grid(0).cache(null).put(key, 2); - - return null; - } - }, IgniteTxHeuristicException.class, null); - - checkEmpty(key); - } - - /** - * @param putBefore If {@code true} then puts some value before executing failing operation. - * @param key Key. - * @throws Exception If failed. - */ - private void checkTransform(boolean putBefore, final Integer key) throws Exception { - if (putBefore) { - idxSpi.forceFail(false); - - info("Put key: " + key); - - grid(0).cache(null).put(key, 1); - } - - // Execute get from all nodes to create readers for near cache. - for (int i = 0; i < gridCount(); i++) - grid(i).cache(null).get(key); - - idxSpi.forceFail(true); - - info("Going to transform: " + key); - - Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - grid(0).<Integer, Integer>jcache(null).invoke(key, new EntryProcessor<Integer, Integer, Void>() { - @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { - e.setValue(2); - - return null; - } - }); - - return null; - } - }, CacheException.class, null); - - assertTrue("Unexpected cause: " +e, e.getCause() instanceof IgniteTxHeuristicException); - - checkEmpty(key); - } - - /** - * @param putBefore If {@code true} then puts some value before executing failing operation. - * @param keys Keys. - * @throws Exception If failed. - */ - private void checkPutAll(boolean putBefore, Integer ... keys) throws Exception { - assert keys.length > 1; - - if (putBefore) { - idxSpi.forceFail(false); - - Map<Integer, Integer> m = new HashMap<>(); - - for (Integer key : keys) - m.put(key, 1); - - info("Put data: " + m); - - grid(0).cache(null).putAll(m); - } - - // Execute get from all nodes to create readers for near cache. - for (int i = 0; i < gridCount(); i++) { - for (Integer key : keys) - grid(i).cache(null).get(key); - } - - idxSpi.forceFail(true); - - final Map<Integer, Integer> m = new HashMap<>(); - - for (Integer key : keys) - m.put(key, 2); - - info("Going to putAll: " + m); - - GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - grid(0).cache(null).putAll(m); - - return null; - } - }, IgniteTxHeuristicException.class, null); - - for (Integer key : m.keySet()) - checkEmpty(key); - } - - /** - * @param putBefore If {@code true} then puts some value before executing failing operation. - * @param key Key. - * @throws Exception If failed. - */ - private void checkRemove(boolean putBefore, final Integer key) throws Exception { - if (putBefore) { - idxSpi.forceFail(false); - - info("Put key: " + key); - - grid(0).cache(null).put(key, 1); - } - - // Execute get from all nodes to create readers for near cache. - for (int i = 0; i < gridCount(); i++) - grid(i).cache(null).get(key); - - idxSpi.forceFail(true); - - info("Going to remove: " + key); - - GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - grid(0).cache(null).remove(key); - - return null; - } - }, IgniteTxHeuristicException.class, null); - - checkEmpty(key); - } - - /** - * Generates key of a given type for given node. - * - * @param node Node. - * @param type Key type. - * @return Key. - */ - private Integer keyForNode(ClusterNode node, int type) { - GridCache<Integer, Integer> cache = grid(0).cache(null); - - if (cache.configuration().getCacheMode() == LOCAL) - return ++lastKey; - - if (cache.configuration().getCacheMode() == REPLICATED && type == NOT_PRIMARY_AND_BACKUP) - return ++lastKey; - - for (int key = lastKey + 1; key < (lastKey + 10_000); key++) { - switch (type) { - case NOT_PRIMARY_AND_BACKUP: { - if (!cache.affinity().isPrimaryOrBackup(node, key)) { - lastKey = key; - - return key; - } - - break; - } - - case PRIMARY: { - if (cache.affinity().isPrimary(node, key)) { - lastKey = key; - - return key; - } - - break; - } - - case BACKUP: { - if (cache.affinity().isBackup(node, key)) { - lastKey = key; - - return key; - } - - break; - } - - default: - fail(); - } - } - - throw new IllegalStateException("Failed to find key."); - } - - /** - * Indexing SPI that can fail on demand. - */ - private static class TestIndexingSpi extends IgniteSpiAdapter implements GridIndexingSpi { - /** Fail flag. */ - private volatile boolean fail; - - /** - * @param fail Fail flag. - */ - public void forceFail(boolean fail) { - this.fail = fail; - } - - /** {@inheritDoc} */ - @Override public Iterator<?> query(@Nullable String spaceName, Collection<Object> params, @Nullable GridIndexingQueryFilter filters) throws IgniteSpiException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime) - throws IgniteSpiException { - if (fail) { - fail = false; - - throw new IgniteSpiException("Test exception."); - } - } - - /** {@inheritDoc} */ - @Override public void remove(@Nullable String spaceName, Object k) - throws IgniteSpiException { - if (fail) { - fail = false; - - throw new IgniteSpiException("Test exception."); - } - } - - /** {@inheritDoc} */ - @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteSpiException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onUnswap(@Nullable String spaceName, Object key, Object val) throws IgniteSpiException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - // No-op. - } - } -}