http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java new file mode 100644 index 0000000..fed2812 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreAbstractTest.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Basic store test. + */ +public abstract class GridCacheWriteBehindStoreAbstractTest extends GridCommonAbstractTest { + /** Flush frequency. */ + private static final int WRITE_FROM_BEHIND_FLUSH_FREQUENCY = 1000; + + /** Cache store. */ + private static final GridCacheTestStore store = new GridCacheTestStore(); + + /** + * + */ + protected GridCacheWriteBehindStoreAbstractTest() { + super(true /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + store.resetTimestamp(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + GridCache<?, ?> cache = cache(); + + if (cache != null) + cache.clearAll(); + + store.reset(); + } + + /** @return Caching mode. */ + protected abstract GridCacheMode cacheMode(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected final IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration c = super.getConfiguration(); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(cacheMode()); + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cc.setSwapEnabled(false); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + cc.setWriteBehindEnabled(true); + cc.setWriteBehindFlushFrequency(WRITE_FROM_BEHIND_FLUSH_FREQUENCY); + + c.setCacheConfiguration(cc); + + return c; + } + + /** @throws Exception If test fails. */ + public void testWriteThrough() throws Exception { + GridCache<Integer, String> cache = cache(); + + Map<Integer, String> map = store.getMap(); + + assert map.isEmpty(); + + IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ); + + try { + for (int i = 1; i <= 10; i++) { + cache.putx(i, Integer.toString(i)); + + checkLastMethod(null); + } + + tx.commit(); + } + finally { + tx.close(); + } + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + checkLastMethod("putAll"); + + assert cache.size() == 10; + + for (int i = 1; i <= 10; i++) { + String val = map.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + store.resetLastMethod(); + + tx = cache.txStart(); + + try { + for (int i = 1; i <= 10; i++) { + String val = cache.remove(i); + + checkLastMethod(null); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + tx.commit(); + } + finally { + tx.close(); + } + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + checkLastMethod("removeAll"); + + assert map.isEmpty(); + } + + /** @throws Exception If test failed. */ + public void testReadThrough() throws Exception { + GridCache<Integer, String> cache = cache(); + + Map<Integer, String> map = store.getMap(); + + assert map.isEmpty(); + + try (IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ)) { + for (int i = 1; i <= 10; i++) + cache.putx(i, Integer.toString(i)); + + checkLastMethod(null); + + tx.commit(); + } + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + checkLastMethod("putAll"); + + for (int i = 1; i <= 10; i++) { + String val = map.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + cache.clearAll(); + + assert cache.isEmpty(); + assert cache.isEmpty(); + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + assert map.size() == 10; + + for (int i = 1; i <= 10; i++) { + // Read through. + String val = cache.get(i); + + checkLastMethod("load"); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + assert cache.size() == 10; + + cache.clearAll(); + + assert cache.isEmpty(); + assert cache.isEmpty(); + + assert map.size() == 10; + + Collection<Integer> keys = new ArrayList<>(); + + for (int i = 1; i <= 10; i++) + keys.add(i); + + // Read through. + Map<Integer, String> vals = cache.getAll(keys); + + checkLastMethod("loadAll"); + + assert vals != null; + assert vals.size() == 10; + + for (int i = 1; i <= 10; i++) { + String val = vals.get(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + // Write through. + cache.removeAll(keys); + + // Need to wait WFB flush timeout. + U.sleep(WRITE_FROM_BEHIND_FLUSH_FREQUENCY + 100); + + checkLastMethod("removeAll"); + + assert cache.isEmpty(); + assert cache.isEmpty(); + + assert map.isEmpty(); + } + + /** @throws Exception If failed. */ + public void testMultithreaded() throws Exception { + final ConcurrentMap<String, Set<Integer>> perThread = new ConcurrentHashMap<>(); + + final AtomicBoolean running = new AtomicBoolean(true); + + final GridCache<Integer, String> cache = cache(); + + IgniteFuture<?> fut = multithreadedAsync(new Runnable() { + @SuppressWarnings({"NullableProblems"}) + @Override public void run() { + // Initialize key set for this thread. + Set<Integer> set = new HashSet<>(); + + Set<Integer> old = perThread.putIfAbsent(Thread.currentThread().getName(), set); + + if (old != null) + set = old; + + Random rnd = new Random(); + + try { + int keyCnt = 20000; + + while (running.get()) { + int op = rnd.nextInt(2); + int key = rnd.nextInt(keyCnt); + + switch (op) { + case 0: + cache.put(key, "val" + key); + set.add(key); + + break; + + case 1: + default: + cache.remove(key); + set.remove(key); + + break; + } + } + } + catch (IgniteCheckedException e) { + error("Unexpected exception in put thread", e); + + assert false; + } + } + }, 10, "put"); + + U.sleep(10000); + + running.set(false); + + fut.get(); + + U.sleep(5 * WRITE_FROM_BEHIND_FLUSH_FREQUENCY); + + Map<Integer, String> stored = store.getMap(); + + for (Map.Entry<Integer, String> entry : stored.entrySet()) { + int key = entry.getKey(); + + assertEquals("Invalid value for key " + key, "val" + key, entry.getValue()); + + boolean found = false; + + for (Set<Integer> threadPuts : perThread.values()) { + if (threadPuts.contains(key)) { + found = true; + + break; + } + } + + assert found : "No threads found that put key " + key; + } + } + + /** @param mtd Expected last method value. */ + private void checkLastMethod(@Nullable String mtd) { + String lastMtd = store.getLastMethod(); + + if (mtd == null) + assert lastMtd == null : "Last method must be null: " + lastMtd; + else { + assert lastMtd != null : "Last method must be not null"; + assert lastMtd.equals(mtd) : "Last method does not match [expected=" + mtd + ", lastMtd=" + lastMtd + ']'; + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java new file mode 100644 index 0000000..1cc35e4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreLocalTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Tests {@link GridCacheWriteBehindStore} in grid configuration. + */ +public class GridCacheWriteBehindStoreLocalTest extends GridCacheWriteBehindStoreAbstractTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return GridCacheMode.LOCAL; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java new file mode 100644 index 0000000..4289a03 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreMultithreadedSelfTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Multithreaded tests for {@link GridCacheWriteBehindStore}. + */ +public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { + /** + * This test performs complex set of operations on store from multiple threads. + * + * @throws Exception If failed. + */ + public void testPutGetRemove() throws Exception { + initStore(2); + + Set<Integer> exp; + + try { + exp = runPutGetRemoveMultithreaded(10, 10); + } + finally { + shutdownStore(); + } + + Map<Integer, String> map = delegate.getMap(); + + Collection<Integer> extra = new HashSet<>(map.keySet()); + + extra.removeAll(exp); + + assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); + + Collection<Integer> missing = new HashSet<>(exp); + + missing.removeAll(map.keySet()); + + assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); + + for (Integer key : exp) + assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); + } + + /** + * Tests that cache would keep values if underlying store fails. + * + * @throws Exception If failed. + */ + public void testStoreFailure() throws Exception { + delegate.setShouldFail(true); + + initStore(2); + + Set<Integer> exp; + + try { + exp = runPutGetRemoveMultithreaded(10, 10); + + U.sleep(FLUSH_FREQUENCY); + + info(">>> There are " + store.getWriteBehindErrorRetryCount() + " entries in RETRY state"); + + delegate.setShouldFail(false); + + // Despite that we set shouldFail flag to false, flush thread may just have caught an exception. + // If we move store to the stopping state right away, this value will be lost. That's why this sleep + // is inserted here to let all exception handlers in write-behind store exit. + U.sleep(1000); + } + finally { + shutdownStore(); + } + + Map<Integer, String> map = delegate.getMap(); + + Collection<Integer> extra = new HashSet<>(map.keySet()); + + extra.removeAll(exp); + + assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); + + Collection<Integer> missing = new HashSet<>(exp); + + missing.removeAll(map.keySet()); + + assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); + + for (Integer key : exp) + assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); + } + + /** + * Tests store consistency in case of high put rate, when flush is performed from the same thread + * as put or remove operation. + * + * @throws Exception If failed. + */ + public void testFlushFromTheSameThread() throws Exception { + // 50 milliseconds should be enough. + delegate.setOperationDelay(50); + + initStore(2); + + Set<Integer> exp; + + int start = store.getWriteBehindTotalCriticalOverflowCount(); + + try { + //We will have in total 5 * CACHE_SIZE keys that should be enough to grow map size to critical value. + exp = runPutGetRemoveMultithreaded(5, CACHE_SIZE); + } + finally { + log.info(">>> Done inserting, shutting down the store"); + + shutdownStore(); + } + + // Restore delay. + delegate.setOperationDelay(0); + + Map<Integer, String> map = delegate.getMap(); + + int end = store.getWriteBehindTotalCriticalOverflowCount(); + + log.info(">>> There are " + exp.size() + " keys in store, " + (end - start) + " overflows detected"); + + assertTrue("No cache overflows detected (a bug or too few keys or too few delay?)", end > start); + + Collection<Integer> extra = new HashSet<>(map.keySet()); + + extra.removeAll(exp); + + assertTrue("The underlying store contains extra keys: " + extra, extra.isEmpty()); + + Collection<Integer> missing = new HashSet<>(exp); + + missing.removeAll(map.keySet()); + + assertTrue("Missing keys in the underlying store: " + missing, missing.isEmpty()); + + for (Integer key : exp) + assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java new file mode 100644 index 0000000..b31c58f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedMultiNodeSelfTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.configuration.*; +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Tests write-behind store with near and dht commit option. + */ +public class GridCacheWriteBehindStorePartitionedMultiNodeSelfTest extends GridCommonAbstractTest { + /** Grids to start. */ + private static final int GRID_CNT = 5; + + /** Ip finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Flush frequency. */ + public static final int WRITE_BEHIND_FLUSH_FREQ = 1000; + + /** Stores per grid. */ + private GridCacheTestStore[] stores = new GridCacheTestStore[GRID_CNT]; + + /** Start grid counter. */ + private int idx; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(GridCacheMode.PARTITIONED); + cc.setWriteBehindEnabled(true); + cc.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQ); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + CacheStore store = stores[idx] = new GridCacheTestStore(); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + c.setCacheConfiguration(cc); + + idx++; + + return c; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stores = null; + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + private void prepare() throws Exception { + idx = 0; + + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testSingleWritesOnDhtNode() throws Exception { + checkSingleWrites(); + } + + /** + * @throws Exception If failed. + */ + public void testBatchWritesOnDhtNode() throws Exception { + checkBatchWrites(); + } + + /** + * @throws Exception If failed. + */ + public void testTxWritesOnDhtNode() throws Exception { + checkTxWrites(); + } + + /** + * @throws Exception If failed. + */ + private void checkSingleWrites() throws Exception { + prepare(); + + GridCache<Integer, String> cache = grid(0).cache(null); + + for (int i = 0; i < 100; i++) + cache.put(i, String.valueOf(i)); + + checkWrites(); + } + + /** + * @throws Exception If failed. + */ + private void checkBatchWrites() throws Exception { + prepare(); + + Map<Integer, String> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, String.valueOf(i)); + + grid(0).cache(null).putAll(map); + + checkWrites(); + } + + /** + * @throws Exception If failed. + */ + private void checkTxWrites() throws Exception { + prepare(); + + GridCache<Object, Object> cache = grid(0).cache(null); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int i = 0; i < 100; i++) + cache.put(i, String.valueOf(i)); + + tx.commit(); + } + + checkWrites(); + } + + /** + * @throws IgniteInterruptedException If sleep was interrupted. + */ + private void checkWrites() throws IgniteInterruptedException { + U.sleep(WRITE_BEHIND_FLUSH_FREQ * 2); + + Collection<Integer> allKeys = new ArrayList<>(100); + + for (int i = 0; i < GRID_CNT; i++) { + Map<Integer,String> map = stores[i].getMap(); + + assertFalse("Missing writes for node: " + i, map.isEmpty()); + + allKeys.addAll(map.keySet()); + + // Check there is no intersection. + for (int j = 0; j < GRID_CNT; j++) { + if (i == j) + continue; + + Collection<Integer> intersection = new HashSet<>(stores[j].getMap().keySet()); + + intersection.retainAll(map.keySet()); + + assertTrue(intersection.isEmpty()); + } + } + + assertEquals(100, allKeys.size()); + + for (int i = 0; i < 100; i++) + assertTrue(allKeys.contains(i)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java new file mode 100644 index 0000000..2a7599b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStorePartitionedTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Tests {@link GridCacheWriteBehindStore} in partitioned configuration. + */ +public class GridCacheWriteBehindStorePartitionedTest extends GridCacheWriteBehindStoreAbstractTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return GridCacheMode.PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java new file mode 100644 index 0000000..1ea5d63 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreReplicatedTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Tests {@link GridCacheWriteBehindStore} in grid configuration. + */ +public class GridCacheWriteBehindStoreReplicatedTest extends GridCacheWriteBehindStoreAbstractTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return GridCacheMode.REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java new file mode 100644 index 0000000..65eaa81 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheWriteBehindStoreSelfTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * This class provides basic tests for {@link GridCacheWriteBehindStore}. + */ +public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { + /** + * Tests correct store shutdown when underlying store fails, + * + * @throws Exception If failed. + */ + public void testShutdownWithFailure() throws Exception { + final AtomicReference<Exception> err = new AtomicReference<>(); + + multithreadedAsync(new Runnable() { + @Override public void run() { + try { + delegate.setShouldFail(true); + + initStore(2); + + try { + store.write(new CacheEntryImpl<>(1, "val1")); + store.write(new CacheEntryImpl<>(2, "val2")); + } + finally { + shutdownStore(); + + delegate.setShouldFail(false); + } + } + catch (Exception e) { + err.set(e); + } + } + }, 1).get(); + + if (err.get() != null) + throw err.get(); + } + + /** + * @throws Exception If failed. + */ + public void testSimpleStore() throws Exception { + initStore(2); + + try { + store.write(new CacheEntryImpl<>(1, "v1")); + store.write(new CacheEntryImpl<>(2, "v2")); + + assertEquals("v1", store.load(1)); + assertEquals("v2", store.load(2)); + assertNull(store.load(3)); + + store.delete(1); + + assertNull(store.load(1)); + assertEquals("v2", store.load(2)); + assertNull(store.load(3)); + } + finally { + shutdownStore(); + } + } + + /** + * Check that all values written to the store will be in underlying store after timeout or due to size limits. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"NullableProblems"}) + public void testValuePropagation() throws Exception { + // Need to test size-based write. + initStore(1); + + try { + for (int i = 0; i < CACHE_SIZE * 2; i++) + store.write(new CacheEntryImpl<>(i, "val" + i)); + + U.sleep(200); + + for (int i = 0; i < CACHE_SIZE; i++) { + String val = delegate.load(i); + + assertNotNull("Value for [key= " + i + "] was not written in store", val); + assertEquals("Invalid value [key=" + i + "]", "val" + i, val); + } + + U.sleep(FLUSH_FREQUENCY + 300); + + for (int i = CACHE_SIZE; i < CACHE_SIZE * 2; i++) { + String val = delegate.load(i); + + assertNotNull("Value for [key= " + i + "] was not written in store", val); + assertEquals("Invalid value [key=" + i + "]", "val" + i, val); + } + } + finally { + shutdownStore(); + } + } + + /** + * Tests store behaviour under continuous put of the same key with different values. + * + * @throws Exception If failed + */ + public void testContinuousPut() throws Exception { + initStore(2); + + try { + final AtomicBoolean running = new AtomicBoolean(true); + + final AtomicInteger actualPutCnt = new AtomicInteger(); + + IgniteFuture<?> fut = multithreadedAsync(new Runnable() { + @SuppressWarnings({"NullableProblems"}) + @Override public void run() { + try { + while (running.get()) { + for (int i = 0; i < CACHE_SIZE; i++) { + store.write(new CacheEntryImpl<>(i, "val-0")); + + actualPutCnt.incrementAndGet(); + + store.write(new CacheEntryImpl<>(i, "val" + i)); + + actualPutCnt.incrementAndGet(); + } + } + } + catch (Exception e) { + error("Unexpected exception in put thread", e); + + assert false; + } + } + }, 1, "put"); + + U.sleep(FLUSH_FREQUENCY * 2 + 500); + + int delegatePutCnt = delegate.getPutAllCount(); + + running.set(false); + + fut.get(); + + log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]"); + + assertTrue("No puts were made to the underlying store", delegatePutCnt > 0); + assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10); + } + finally { + shutdownStore(); + } + + // These checks must be done after the store shut down + assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size()); + + for (int i = 0; i < CACHE_SIZE; i++) + assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i)); + } + + /** + * Tests that all values were put into the store will be written to the underlying store + * after shutdown is called. + * + * @throws Exception If failed. + */ + public void testShutdown() throws Exception { + initStore(2); + + try { + final AtomicBoolean running = new AtomicBoolean(true); + + IgniteFuture<?> fut = multithreadedAsync(new Runnable() { + @SuppressWarnings({"NullableProblems"}) + @Override public void run() { + try { + while (running.get()) { + for (int i = 0; i < CACHE_SIZE; i++) { + store.write(new CacheEntryImpl<>(i, "val-0")); + + store.write(new CacheEntryImpl<>(i, "val" + i)); + } + } + } + catch (Exception e) { + error("Unexpected exception in put thread", e); + + assert false; + } + } + }, 1, "put"); + + U.sleep(300); + + running.set(false); + + fut.get(); + } + finally { + shutdownStore(); + } + + // These checks must be done after the store shut down + assertEquals("Invalid store size", CACHE_SIZE, delegate.getMap().size()); + + for (int i = 0; i < CACHE_SIZE; i++) + assertEquals("Invalid value stored", "val" + i, delegate.getMap().get(i)); + } + + /** + * Tests that all values will be written to the underlying store + * right in the same order as they were put into the store. + * + * @throws Exception If failed. + */ + public void testBatchApply() throws Exception { + delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, String>()); + + initStore(1); + + List<Integer> intList = new ArrayList<>(CACHE_SIZE); + + try { + for (int i = 0; i < CACHE_SIZE; i++) { + store.write(new CacheEntryImpl<>(i, "val" + i)); + + intList.add(i); + } + } + finally { + shutdownStore(); + } + + Map<Integer, String> underlyingMap = delegate.getMap(); + + assertTrue("Store map key set: " + underlyingMap.keySet(), F.eqOrdered(underlyingMap.keySet(), intList)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java new file mode 100644 index 0000000..b0286b6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Tests for local transactions. + */ +@SuppressWarnings( {"BusyWait"}) +abstract class IgniteTxAbstractTest extends GridCommonAbstractTest { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Execution count. */ + private static final AtomicInteger cntr = new AtomicInteger(); + + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * Start grid by default. + */ + protected IgniteTxAbstractTest() { + super(false /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + return c; + } + + /** + * @return Grid count. + */ + protected abstract int gridCount(); + + /** + * @return Key count. + */ + protected abstract int keyCount(); + + /** + * @return Maximum key value. + */ + protected abstract int maxKeyValue(); + + /** + * @return Thread iterations. + */ + protected abstract int iterations(); + + /** + * @return True if in-test logging is enabled. + */ + protected abstract boolean isTestDebug(); + + /** + * @return {@code True} if memory stats should be printed. + */ + protected abstract boolean printMemoryStats(); + + /** {@inheritDoc} */ + private void debug(String msg) { + if (isTestDebug()) + info(msg); + } + + /** + * @throws Exception If failed. + */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < gridCount(); i++) + startGrid(i); + } + + /** + * @throws Exception If failed. + */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @param i Grid index. + * @return Cache. + */ + @SuppressWarnings("unchecked") + @Override protected GridCache<Integer, String> cache(int i) { + return grid(i).cache(null); + } + + /** + * @return Keys. + */ + protected Iterable<Integer> getKeys() { + List<Integer> keys = new ArrayList<>(keyCount()); + + for (int i = 0; i < keyCount(); i++) + keys.add(RAND.nextInt(maxKeyValue()) + 1); + + Collections.sort(keys); + + return Collections.unmodifiableList(keys); + } + + /** + * @return Random cache operation. + */ + protected OP getOp() { + switch (RAND.nextInt(3)) { + case 0: { return OP.READ; } + case 1: { return OP.WRITE; } + case 2: { return OP.REMOVE; } + + // Should never be reached. + default: { assert false; return null; } + } + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If check failed. + */ + protected void checkCommit(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { + int gridIdx = RAND.nextInt(gridCount()); + + Ignite ignite = grid(gridIdx); + + if (isTestDebug()) + debug("Checking commit on grid: " + ignite.cluster().localNode().id()); + + for (int i = 0; i < iterations(); i++) { + GridCache<Integer, String> cache = cache(gridIdx); + + IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0); + + try { + int prevKey = -1; + + for (Integer key : getKeys()) { + // Make sure we have the same locking order for all concurrent transactions. + assert key >= prevKey : "key: " + key + ", prevKey: " + prevKey; + + if (isTestDebug()) { + GridCacheAffinityFunction aff = cache.configuration().getAffinity(); + + int part = aff.partition(key); + + debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + + U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); + } + + String val = Integer.toString(key); + + switch (getOp()) { + case READ: { + if (isTestDebug()) + debug("Reading key [key=" + key + ", i=" + i + ']'); + + val = cache.get(key); + + if (isTestDebug()) + debug("Read value for key [key=" + key + ", val=" + val + ']'); + + break; + } + + case WRITE: { + if (isTestDebug()) + debug("Writing key and value [key=" + key + ", val=" + val + ", i=" + i + ']'); + + cache.put(key, val); + + break; + } + + case REMOVE: { + if (isTestDebug()) + debug("Removing key [key=" + key + ", i=" + i + ']'); + + cache.remove(key); + + break; + } + + default: { assert false; } + } + } + + tx.commit(); + + if (isTestDebug()) + debug("Committed transaction [i=" + i + ", tx=" + tx + ']'); + } + catch (IgniteTxOptimisticException e) { + if (concurrency != OPTIMISTIC || isolation != SERIALIZABLE) { + error("Received invalid optimistic failure.", e); + + throw e; + } + + if (isTestDebug()) + info("Optimistic transaction failure (will rollback) [i=" + i + ", msg=" + e.getMessage() + + ", tx=" + tx.xid() + ']'); + + try { + tx.rollback(); + } + catch (IgniteCheckedException ex) { + error("Failed to rollback optimistic failure: " + tx, ex); + + throw ex; + } + } + catch (Exception e) { + error("Transaction failed (will rollback): " + tx, e); + + tx.rollback(); + + throw e; + } + catch (Error e) { + error("Error when executing transaction (will rollback): " + tx, e); + + tx.rollback(); + + throw e; + } + finally { + IgniteTx t = cache.tx(); + + assert t == null : "Thread should not have transaction upon completion ['t==tx'=" + (t == tx) + + ", t=" + t + (t != tx ? "tx=" + tx : "tx=''") + ']'; + } + } + + if (printMemoryStats()) { + if (cntr.getAndIncrement() % 100 == 0) + // Print transaction memory stats. + ((GridKernal)grid(gridIdx)).internalCache().context().tm().printMemoryStats(); + } + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws IgniteCheckedException If check failed. + */ + protected void checkRollback(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) + throws Exception { + checkRollback(new ConcurrentHashMap<Integer, String>(), concurrency, isolation); + } + + /** + * @param map Map to check. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws IgniteCheckedException If check failed. + */ + protected void checkRollback(ConcurrentMap<Integer, String> map, IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation) throws Exception { + int gridIdx = RAND.nextInt(gridCount()); + + Ignite ignite = grid(gridIdx); + + if (isTestDebug()) + debug("Checking commit on grid: " + ignite.cluster().localNode().id()); + + for (int i = 0; i < iterations(); i++) { + GridCache<Integer, String> cache = cache(gridIdx); + + IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0); + + try { + for (Integer key : getKeys()) { + if (isTestDebug()) { + GridCacheAffinityFunction aff = cache.configuration().getAffinity(); + + int part = aff.partition(key); + + debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + + U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); + } + + String val = Integer.toString(key); + + switch (getOp()) { + case READ: { + debug("Reading key: " + key); + + checkMap(map, key, cache.get(key)); + + break; + } + + case WRITE: { + debug("Writing key and value [key=" + key + ", val=" + val + ']'); + + checkMap(map, key, cache.put(key, val)); + + break; + } + + case REMOVE: { + debug("Removing key: " + key); + + checkMap(map, key, cache.remove(key)); + + break; + } + + default: { assert false; } + } + } + + tx.rollback(); + + debug("Rolled back transaction: " + tx); + } + catch (IgniteTxOptimisticException e) { + tx.rollback(); + + log.warning("Rolled back transaction due to optimistic exception [tx=" + tx + ", e=" + e + ']'); + + throw e; + } + catch (Exception e) { + tx.rollback(); + + error("Rolled back transaction due to exception [tx=" + tx + ", e=" + e + ']'); + + throw e; + } + finally { + IgniteTx t1 = cache.tx(); + + debug("t1=" + t1); + + assert t1 == null : "Thread should not have transaction upon completion ['t==tx'=" + (t1 == tx) + + ", t=" + t1 + ']'; + } + } + } + + /** + * @param map Map to check against. + * @param key Key. + * @param val Value. + */ + private void checkMap(ConcurrentMap<Integer, String> map, Integer key, String val) { + if (val != null) { + String v = map.putIfAbsent(key, val); + + assert v == null || v.equals(val); + } + } + + /** + * Checks integrity of all caches after tests. + * + * @throws IgniteCheckedException If check failed. + */ + @SuppressWarnings({"ErrorNotRethrown"}) + protected void finalChecks() throws Exception { + for (int i = 1; i <= maxKeyValue(); i++) { + for (int k = 0; k < 3; k++) { + try { + GridCacheEntry<Integer, String> e1 = null; + + String v1 = null; + + for (int j = 0; j < gridCount(); j++) { + GridCache<Integer, String> cache = cache(j); + + IgniteTx tx = cache.tx(); + + assertNull("Transaction is not completed: " + tx, tx); + + if (j == 0) { + e1 = cache.entry(i); + + v1 = e1.get(); + } + else { + GridCacheEntry<Integer, String> e2 = cache.entry(i); + + String v2 = e2.get(); + + if (!F.eq(v2, v1)) { + v1 = e1.get(); + v2 = e2.get(); + } + + assert F.eq(v2, v1) : + "Invalid cached value [key=" + i + ", v1=" + v1 + ", v2=" + v2 + ", e1=" + e1 + + ", e2=" + e2 + ", grid=" + j + ']'; + } + } + + break; + } + catch (AssertionError e) { + if (k == 2) + throw e; + else + // Wait for transactions to complete. + Thread.sleep(500); + } + } + } + + for (int i = 1; i <= maxKeyValue(); i++) { + for (int k = 0; k < 3; k++) { + try { + for (int j = 0; j < gridCount(); j++) { + GridCacheProjection<Integer, String> cache = cache(j); + + cache.removeAll(); + +// assert cache.keySet().isEmpty() : "Cache is not empty: " + cache.entrySet(); + } + + break; + } + catch (AssertionError e) { + if (k == 2) + throw e; + else + // Wait for transactions to complete. + Thread.sleep(500); + } + } + } + } + + /** + * Cache operation. + */ + protected enum OP { + /** Cache read. */ + READ, + + /** Cache write. */ + WRITE, + + /** Cache remove. */ + REMOVE + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConcurrentGetAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConcurrentGetAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConcurrentGetAbstractTest.java new file mode 100644 index 0000000..ee07f72 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConcurrentGetAbstractTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Checks multithreaded put/get cache operations on one node. + */ +public abstract class IgniteTxConcurrentGetAbstractTest extends GridCommonAbstractTest { + /** Debug flag. */ + private static final boolean DEBUG = false; + + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int THREAD_NUM = 20; + + /** + * Default constructor. + * + */ + protected IgniteTxConcurrentGetAbstractTest() { + super(true /** Start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** + * @param g Grid. + * @return Near cache. + */ + GridNearCacheAdapter<String, Integer> near(Ignite g) { + return (GridNearCacheAdapter<String, Integer>)((GridKernal)g).<String, Integer>internalCache(); + } + + /** + * @param g Grid. + * @return DHT cache. + */ + GridDhtCacheAdapter<String, Integer> dht(Ignite g) { + return near(g).dht(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutGet() throws Exception { + // Random key. + final String key = UUID.randomUUID().toString(); + + final Ignite ignite = grid(); + + ignite.cache(null).put(key, "val"); + + GridCacheEntryEx<String,Integer> dhtEntry = dht(ignite).peekEx(key); + + if (DEBUG) + info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", entry=" + dhtEntry + ']'); + + String val = txGet(ignite, key); + + assertNotNull(val); + + info("Starting threads: " + THREAD_NUM); + + multithreaded(new Callable<String>() { + @Override public String call() throws Exception { + return txGet(ignite, key); + } + }, THREAD_NUM, "getter-thread"); + } + + /** + * @param ignite Grid. + * @param key Key. + * @return Value. + * @throws Exception If failed. + */ + private String txGet(Ignite ignite, String key) throws Exception { + try (IgniteTx tx = ignite.cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { + GridCacheEntryEx<String, Integer> dhtEntry = dht(ignite).peekEx(key); + + if (DEBUG) + info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", xid=" + tx.xid() + + ", entry=" + dhtEntry + ']'); + + String val = ignite.<String, String>cache(null).get(key); + + assertNotNull(val); + assertEquals("val", val); + + tx.commit(); + + return val; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java new file mode 100644 index 0000000..126da68 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxExceptionAbstractSelfTest.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.indexing.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests that transaction is invalidated in case of {@link IgniteTxHeuristicException}. + */ +public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstractSelfTest { + /** Index SPI throwing exception. */ + private static TestIndexingSpi idxSpi = new TestIndexingSpi(); + + /** */ + private static final int PRIMARY = 0; + + /** */ + private static final int BACKUP = 1; + + /** */ + private static final int NOT_PRIMARY_AND_BACKUP = 2; + + /** */ + private static Integer lastKey; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setIndexingSpi(idxSpi); + + cfg.getTransactionsConfiguration().setTxSerializableEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setQueryIndexEnabled(true); + ccfg.setCacheStoreFactory(null); + ccfg.setReadThrough(false); + ccfg.setWriteThrough(false); + ccfg.setLoadPreviousValue(true); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + lastKey = 0; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + idxSpi.forceFail(false); + } + + /** + * @throws Exception If failed. + */ + public void testPutNear() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkPut(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutPrimary() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), PRIMARY)); + + checkPut(false, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testPutBackup() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), BACKUP)); + + checkPut(false, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutAll() throws Exception { + checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + if (gridCount() > 1) { + checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + + checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoveNear() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkRemove(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testRemovePrimary() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY)); + + checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveBackup() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), BACKUP)); + + checkRemove(true, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformNear() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkTransform(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformPrimary() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY)); + + checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformBackup() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), BACKUP)); + + checkTransform(true, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutNearTx() throws Exception { + for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { + for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutPrimaryTx() throws Exception { + for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { + for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutBackupTx() throws Exception { + for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { + for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutMultipleKeysTx() throws Exception { + for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { + for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + if (gridCount() > 1) { + checkPutTx(true, concurrency, isolation, + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + } + } + } + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param keys Keys. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void checkPutTx(boolean putBefore, IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, final Integer... keys) throws Exception { + assertTrue(keys.length > 0); + + info("Test transaction [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + GridCache<Integer, Integer> cache = grid(0).cache(null); + + if (putBefore) { + idxSpi.forceFail(false); + + info("Start transaction."); + + try (IgniteTx tx = cache.txStart(concurrency, isolation)) { + for (Integer key : keys) { + info("Put " + key); + + cache.put(key, 1); + } + + info("Commit."); + + tx.commit(); + } + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + grid(i).cache(null).get(key); + } + + idxSpi.forceFail(true); + + try { + info("Start transaction."); + + try (IgniteTx tx = cache.txStart(concurrency, isolation)) { + for (Integer key : keys) { + info("Put " + key); + + cache.put(key, 2); + } + + info("Commit."); + + tx.commit(); + } + + fail("Transaction should fail."); + } + catch (IgniteTxHeuristicException e) { + log.info("Expected exception: " + e); + } + + for (Integer key : keys) + checkEmpty(key); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkEmpty(final Integer key) throws Exception { + idxSpi.forceFail(false); + + info("Check key: " + key); + + for (int i = 0; i < gridCount(); i++) { + GridKernal grid = (GridKernal) grid(i); + + GridCacheAdapter cache = grid.internalCache(null); + + GridCacheMapEntry entry = cache.map().getEntry(key); + + log.info("Entry: " + entry); + + if (entry != null) { + assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny()); + assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue()); + } + + if (cache.isNear()) { + entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(key); + + log.info("Dht entry: " + entry); + + if (entry != null) { + assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny()); + assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue()); + } + } + } + + for (int i = 0; i < gridCount(); i++) + assertEquals("Unexpected value for grid " + i, null, grid(i).cache(null).get(key)); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkPut(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + idxSpi.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + idxSpi.forceFail(true); + + info("Going to put: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).put(key, 2); + + return null; + } + }, IgniteTxHeuristicException.class, null); + + checkEmpty(key); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkTransform(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + idxSpi.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + idxSpi.forceFail(true); + + info("Going to transform: " + key); + + Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).<Integer, Integer>jcache(null).invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + e.setValue(2); + + return null; + } + }); + + return null; + } + }, CacheException.class, null); + + assertTrue("Unexpected cause: " +e, e.getCause() instanceof IgniteTxHeuristicException); + + checkEmpty(key); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param keys Keys. + * @throws Exception If failed. + */ + private void checkPutAll(boolean putBefore, Integer ... keys) throws Exception { + assert keys.length > 1; + + if (putBefore) { + idxSpi.forceFail(false); + + Map<Integer, Integer> m = new HashMap<>(); + + for (Integer key : keys) + m.put(key, 1); + + info("Put data: " + m); + + grid(0).cache(null).putAll(m); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + grid(i).cache(null).get(key); + } + + idxSpi.forceFail(true); + + final Map<Integer, Integer> m = new HashMap<>(); + + for (Integer key : keys) + m.put(key, 2); + + info("Going to putAll: " + m); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).putAll(m); + + return null; + } + }, IgniteTxHeuristicException.class, null); + + for (Integer key : m.keySet()) + checkEmpty(key); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkRemove(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + idxSpi.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + idxSpi.forceFail(true); + + info("Going to remove: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).remove(key); + + return null; + } + }, IgniteTxHeuristicException.class, null); + + checkEmpty(key); + } + + /** + * Generates key of a given type for given node. + * + * @param node Node. + * @param type Key type. + * @return Key. + */ + private Integer keyForNode(ClusterNode node, int type) { + GridCache<Integer, Integer> cache = grid(0).cache(null); + + if (cache.configuration().getCacheMode() == LOCAL) + return ++lastKey; + + if (cache.configuration().getCacheMode() == REPLICATED && type == NOT_PRIMARY_AND_BACKUP) + return ++lastKey; + + for (int key = lastKey + 1; key < (lastKey + 10_000); key++) { + switch (type) { + case NOT_PRIMARY_AND_BACKUP: { + if (!cache.affinity().isPrimaryOrBackup(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + case PRIMARY: { + if (cache.affinity().isPrimary(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + case BACKUP: { + if (cache.affinity().isBackup(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + default: + fail(); + } + } + + throw new IllegalStateException("Failed to find key."); + } + + /** + * Indexing SPI that can fail on demand. + */ + private static class TestIndexingSpi extends IgniteSpiAdapter implements GridIndexingSpi { + /** Fail flag. */ + private volatile boolean fail; + + /** + * @param fail Fail flag. + */ + public void forceFail(boolean fail) { + this.fail = fail; + } + + /** {@inheritDoc} */ + @Override public Iterator<?> query(@Nullable String spaceName, Collection<Object> params, @Nullable GridIndexingQueryFilter filters) throws IgniteSpiException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime) + throws IgniteSpiException { + if (fail) { + fail = false; + + throw new IgniteSpiException("Test exception."); + } + } + + /** {@inheritDoc} */ + @Override public void remove(@Nullable String spaceName, Object k) + throws IgniteSpiException { + if (fail) { + fail = false; + + throw new IgniteSpiException("Test exception."); + } + } + + /** {@inheritDoc} */ + @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onUnswap(@Nullable String spaceName, Object key, Object val) throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + // No-op. + } + } +}