http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java new file mode 100644 index 0000000..83bfe27 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiNodeAbstractTest.java @@ -0,0 +1,927 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + + +/** + * Checks basic multi-node transactional operations. + */ +@SuppressWarnings({"PointlessBooleanExpression", "ConstantConditions", "PointlessArithmeticExpression"}) +public abstract class IgniteTxMultiNodeAbstractTest extends GridCommonAbstractTest { + /** Debug flag. */ + private static final boolean DEBUG = false; + + /** */ + protected static final int GRID_CNT = 4; + + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + protected static final int RETRIES = 300; + + /** Log frequency. */ + private static final int LOG_FREQ = RETRIES < 100 || DEBUG ? 1 : RETRIES / 5; + + /** Counter key. */ + private static final String CNTR_KEY = "CNTR_KEY"; + + /** Removed counter key. */ + private static final String RMVD_CNTR_KEY = "RMVD_CNTR_KEY"; + + /** */ + protected static final AtomicInteger cntr = new AtomicInteger(); + + /** */ + protected static final AtomicInteger cntrRmvd = new AtomicInteger(); + + /** Number of backups for partitioned tests. */ + protected int backups = 2; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + backups = 0; + + cntr.set(0); + } + + /** + * + * @param ignite Grid + * @param key Key. + * @return Primary node id. + */ + @SuppressWarnings("unchecked") + private static UUID primaryId(Ignite ignite, Object key) { + GridCacheAffinity aff = ignite.cache(null).cache().affinity(); + + Collection<ClusterNode> affNodes = aff.mapPartitionToPrimaryAndBackups(aff.partition(key)); + + ClusterNode first = F.first(affNodes); + + assert first != null; + + return first.id(); + } + + /** + * @param nodeId Node ID. + * @param key Key. + * @return DHT entry. + */ + @Nullable private static GridCacheEntryEx<Object, Integer> dhtEntry(UUID nodeId, Object key) { + Ignite g = G.ignite(nodeId); + + GridDhtCacheAdapter<Object, Integer> dht = + ((GridKernal)g).<Object, Integer>internalCache().context().near().dht(); + + return dht.peekEx(key); + } + + /** + * @param nodeId Node ID. + * @param key Key. + * @return Near entry. + */ + @Nullable private static GridCacheEntryEx<Object, Integer> nearEntry(UUID nodeId, Object key) { + Ignite g = G.ignite(nodeId); + + GridNearCacheAdapter<Object, Integer> near = ((GridKernal)g).<Object, Integer>internalCache().context().near(); + + return near.peekEx(key); + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @param itemKey Item key. + * @param retry Retry count. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onItemNear(boolean putCntr, Ignite ignite, String itemKey, int retry) throws IgniteCheckedException { + GridCache<String, Integer> cache = ignite.cache(null); + + UUID locId = ignite.cluster().localNode().id(); + UUID itemPrimaryId = primaryId(ignite, itemKey); + UUID cntrPrimaryId = primaryId(ignite, CNTR_KEY); + + boolean isCntrPrimary = cntrPrimaryId.equals(locId); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (DEBUG) + info("Before near get [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() + + ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId + + ", nearEntry=" + nearEntry(locId, CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']'); + + Integer cntr = cache.get(CNTR_KEY); + + int newVal = cntr + 1; + + if (putCntr) { + if (DEBUG) + info("Before near put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']'); + + cache.putx(CNTR_KEY, newVal); + } + + if (DEBUG) + info("Before near put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + ", new=" + newVal + + ", nearEntry=" + nearEntry(locId, itemKey) + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + cache.putx(itemKey, newVal); + + if (DEBUG) + info("After near put item [retry=" + retry + ", key=" + itemKey + ", old=" + cntr + ", new=" + newVal + + ", nearEntry=" + nearEntry(locId, itemKey) + ", dhtEntry" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + tx.commit(); + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @param itemKey Item key. + * @param retry Retry count. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onItemPrimary(boolean putCntr, Ignite ignite, String itemKey, int retry) throws IgniteCheckedException { + GridCache<String, Integer> cache = ignite.cache(null); + + UUID locId = ignite.cluster().localNode().id(); + UUID itemPrimaryId = primaryId(ignite, itemKey); + UUID cntrPrimaryId = primaryId(ignite, CNTR_KEY); + + boolean isCntrPrimary = cntrPrimaryId.equals(locId); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (DEBUG) + info("Before item primary get [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() + + ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId + + ", nearEntry=" + nearEntry(locId, CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']'); + + Integer cntr = cache.get(CNTR_KEY); + + int newVal = cntr + 1; + + if (putCntr) { + if (DEBUG) + info("Before item primary put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']'); + + cache.putx(CNTR_KEY, newVal); + } + + if (DEBUG) + info("Before item primary put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + cache.putx(itemKey, cntr); + + if (DEBUG) + info("After item primary put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + tx.commit(); + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @param retry Retry count. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onRemoveItemQueried(boolean putCntr, Ignite ignite, int retry) throws IgniteCheckedException { + GridCache<String, Integer> cache = ignite.cache(null); + + UUID locId = ignite.cluster().localNode().id(); + UUID cntrPrimaryId = primaryId(ignite, RMVD_CNTR_KEY); + + boolean isCntrPrimary = cntrPrimaryId.equals(locId); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (DEBUG) + ignite.log().info("Before item lock [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() + + ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId + + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'); + + Integer cntr = cache.get(RMVD_CNTR_KEY); + + assert cntr != null : "Received null counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'; + + int newVal = cntr - 1; + + if (putCntr) { + if (DEBUG) + ignite.log().info("Before item put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'); + + cache.putx(RMVD_CNTR_KEY, newVal); + } + + while (true) { + GridCacheQuery<Map.Entry<String, Integer>> qry = + cache.queries().createSqlQuery(Integer.class, "_key != 'RMVD_CNTR_KEY' and _val >= 0"); + + if (DEBUG) + ignite.log().info("Before executing query [retry=" + retry + ", locId=" + locId + + ", txId=" + tx.xid() + ']'); + + Map.Entry<String, Integer> entry = qry.execute().next(); + + if (entry == null) { + ignite.log().info("*** Queue is empty."); + + return; + } + + String itemKey = entry.getKey(); + + UUID itemPrimaryId = primaryId(ignite, itemKey); + + // Lock the item key. + if (cache.get(itemKey) != null) { + if (DEBUG) + ignite.log().info("Before item remove [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + assert cache.removex(itemKey) : "Failed to remove key [locId=" + locId + + ", primaryId=" + itemPrimaryId + ", key=" + itemKey + ']'; + + if (DEBUG) + info("After item remove item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + break; + } + else + cache.removex(itemKey); + } + + tx.commit(); + } + catch (Error e) { + ignite.log().error("Error in test.", e); + + throw e; + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @param retry Retry count. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onRemoveItemSimple(boolean putCntr, Ignite ignite, int retry) throws IgniteCheckedException { + GridCache<String, Integer> cache = ignite.cache(null); + + UUID locId = ignite.cluster().localNode().id(); + UUID cntrPrimaryId = primaryId(ignite, RMVD_CNTR_KEY); + + boolean isCntrPrimary = cntrPrimaryId.equals(locId); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (DEBUG) + ignite.log().info("Before item lock [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() + + ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId + + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'); + + Integer cntr = cache.get(RMVD_CNTR_KEY); + + assert cntr != null : "Received null counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'; + + String itemKey = Integer.toString(cntrRmvd.getAndIncrement()); + + Integer val = cache.get(itemKey); + + assert val != null : "Received null val [retry=" + retry + ", cacheSize=" + cache.size() + ']'; + + UUID itemPrimaryId = primaryId(ignite, itemKey); + + int newVal = cntr - 1; + + if (putCntr) { + if (DEBUG) + ignite.log().info("Before item put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'); + + cache.putx(RMVD_CNTR_KEY, newVal); + } + + if (DEBUG) + ignite.log().info("Before item remove item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + assertTrue(cache.removex(itemKey)); + + if (DEBUG) + info("After item put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + tx.commit(); + } + catch (Error e) { + ignite.log().error("Error in test.", e); + + throw e; + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @throws IgniteCheckedException If failed. + */ + private void retries(Ignite ignite, boolean putCntr) throws IgniteCheckedException { + UUID nodeId = ignite.cluster().localNode().id(); + + for (int i = 0; i < RETRIES; i++) { + int cnt = cntr.getAndIncrement(); + + if (DEBUG) + ignite.log().info("***"); + if (DEBUG || cnt % LOG_FREQ == 0) + ignite.log().info("*** Iteration #" + i + " ***"); + if (DEBUG) + ignite.log().info("***"); + + String itemKey = nodeId + "-#" + i; + + if (nodeId.equals(primaryId(ignite, itemKey))) + onItemPrimary(putCntr, ignite, itemKey, i); + else + onItemNear(putCntr, ignite, itemKey, i); + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @throws IgniteCheckedException If failed. + */ + private void removeRetriesQueried(Ignite ignite, boolean putCntr) throws IgniteCheckedException { + for (int i = 0; i < RETRIES; i++) { + if (DEBUG) + ignite.log().info("***"); + + if (DEBUG || cntrRmvd.getAndIncrement() % LOG_FREQ == 0) + ignite.log().info("*** Iteration #" + i + " ***"); + + if (DEBUG) + ignite.log().info("***"); + + onRemoveItemQueried(putCntr, ignite, i); + + if (i % 50 == 0) + ((GridKernal) ignite).internalCache().context().tm().printMemoryStats(); + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @throws IgniteCheckedException If failed. + */ + private void removeRetriesSimple(Ignite ignite, boolean putCntr) throws IgniteCheckedException { + for (int i = 0; i < RETRIES; i++) { + if (DEBUG) + ignite.log().info("***"); + + if (cntrRmvd.get() % LOG_FREQ == 0 || DEBUG) + ignite.log().info("*** Iteration #" + i + " ***"); + + if (DEBUG) + ignite.log().info("***"); + + onRemoveItemSimple(putCntr, ignite, i); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutOneEntryInTx() throws Exception { +// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName()); + + startGrids(GRID_CNT); + + try { + grid(0).cache(null).put(CNTR_KEY, 0); + + grid(0).compute().call(new PutOneEntryInTxJob()); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutTwoEntriesInTx() throws Exception { +// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName()); + + startGrids(GRID_CNT); + + try { + grid(0).cache(null).put(CNTR_KEY, 0); + + grid(0).compute().call(new PutTwoEntriesInTxJob()); + + printCounter(); + + assertEquals(GRID_CNT * RETRIES, grid(0).cache(null).get(CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutOneEntryInTxMultiThreaded() throws Exception { +// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName()); + + startGrids(GRID_CNT); + + Collection<Thread> threads = new LinkedList<>(); + + try { + // Initialize. + grid(0).cache(null).put(CNTR_KEY, 0); + + for (int i = 0; i < GRID_CNT; i++) { + final int gridId = i; + + threads.add(new Thread("thread-#" + i) { + @Override public void run() { + try { + retries(grid(gridId), false); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + for (Thread th : threads) + th.start(); + + for (Thread th : threads) + th.join(); + + printCounter(); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutTwoEntryInTxMultiThreaded() throws Exception { +// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName()); + + startGrids(GRID_CNT); + + Collection<Thread> threads = new LinkedList<>(); + + try { + grid(0).cache(null).put(CNTR_KEY, 0); + + for (int i = 0; i < GRID_CNT; i++) { + final int gridId = i; + + threads.add(new Thread() { + @Override public void run() { + try { + retries(grid(gridId), true); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + for (Thread th : threads) + th.start(); + + for (Thread th : threads) + th.join(); + + printCounter(); + + assertEquals(GRID_CNT * RETRIES, grid(0).cache(null).get(CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testRemoveInTxQueried() throws Exception { + //resetLog4j(Level.INFO, true, GridCacheTxManager.class.getPackage().getName()); + + startGrids(GRID_CNT); + + try { + GridCache<String, Integer> cache = grid(0).cache(null); + + cache.put(RMVD_CNTR_KEY, 0); + + for (int i = 0; i < GRID_CNT * RETRIES; i++) + cache.put(String.valueOf(i), i); + + for (int i = 0; i < RETRIES; i++) + for (int j = 0; j < GRID_CNT; j++) + assertEquals(i, grid(j).cache(null).get(String.valueOf(i))); + + GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, " _val >= 0"); + + Collection<Map.Entry<String, Integer>> entries = qry.execute().get(); + + assertFalse(entries.isEmpty()); + + cntrRmvd.set(0); + + grid(0).compute().call(new RemoveInTxJobQueried()); + + for (int i = 0; i < GRID_CNT * RETRIES; i++) + for (int ii = 0; ii < GRID_CNT; ii++) + assertEquals(null, grid(ii).cache(null).get(Integer.toString(i))); + + assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testRemoveInTxSimple() throws Exception { + startGrids(GRID_CNT); + + try { + GridCache<String, Integer> cache = grid(0).cache(null); + + cache.put(RMVD_CNTR_KEY, 0); + + for (int i = 0; i < GRID_CNT * RETRIES; i++) + cache.put(Integer.toString(i), i); + + for (int i = 0; i < RETRIES; i++) + for (int j = 0; j < GRID_CNT; j++) + assertEquals(i, grid(j).cache(null).get(Integer.toString(i))); + + GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, " _val >= 0"); + + Collection<Map.Entry<String, Integer>> entries = qry.execute().get(); + + assertFalse(entries.isEmpty()); + + cntrRmvd.set(0); + + grid(0).compute().call(new RemoveInTxJobSimple()); + + // Check using cache. + for (int i = 0; i < GRID_CNT * RETRIES; i++) + for (int ii = 0; ii < GRID_CNT; ii++) + assertEquals(null, grid(ii).cache(null).get(Integer.toString(i))); + + // Check using query. + entries = qry.execute().get(); + + assertTrue(entries.isEmpty()); + + assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testRemoveInTxQueriedMultiThreaded() throws Exception { + //resetLog4j(Level.INFO, true, GridCacheTxManager.class.getPackage().getName()); + + backups = 1; + + try { + startGrids(GRID_CNT); + + GridCache<String, Integer> cache = grid(0).cache(null); + + // Store counter. + cache.put(RMVD_CNTR_KEY, 0); + + // Store values. + for (int i = 1; i <= GRID_CNT * RETRIES; i++) + cache.put(String.valueOf(i), i); + + for (int j = 0; j < GRID_CNT; j++) + assertEquals(0, grid(j).cache(null).get(RMVD_CNTR_KEY)); + + for (int i = 1; i <= RETRIES; i++) + for (int j = 0; j < GRID_CNT; j++) + assertEquals(i, grid(j).cache(null).get(String.valueOf(i))); + + GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "_val >= 0"); + + // Load all results. + qry.keepAll(true); + qry.includeBackups(false); + + // NOTE: for replicated cache includeBackups(false) is not enough since + // all nodes are considered primary, so we have to deduplicate result set. + if (cache.configuration().getCacheMode() == REPLICATED) + qry.enableDedup(true); + + List<Map.Entry<String, Integer>> entries = + new ArrayList<>(qry.execute().get()); + + Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { + @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { + return o1.getValue().compareTo(o2.getValue()); + } + }); + + info("Queried entries: " + entries); + + int val = 0; + + for (Map.Entry<String, Integer> e : entries) { + assertEquals(val, e.getValue().intValue()); + + val++; + } + + assertFalse(entries.isEmpty()); + + cntrRmvd.set(0); + + Collection<Thread> threads = new LinkedList<>(); + + for (int i = 0; i < GRID_CNT; i++) { + final int gridId = i; + + threads.add(new Thread() { + @Override public void run() { + try { + removeRetriesQueried(grid(gridId), true); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + for (Thread th : threads) + th.start(); + + for (Thread th : threads) + th.join(); + + for (int i = 0; i < GRID_CNT * RETRIES; i++) + for (int ii = 0; ii < GRID_CNT; ii++) + assertEquals("Got invalid value from cache [gridIdx=" + ii + ", key=" + i + ']', + null, grid(ii).cache(null).get(Integer.toString(i))); + + assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void printCounter() throws IgniteCheckedException { + info("***"); + info("*** Peeked counter: " + grid(0).cache(null).peek(CNTR_KEY)); + info("*** Got counter: " + grid(0).cache(null).get(CNTR_KEY)); + info("***"); + } + + /** + * Test job putting data to queue. + */ + protected class PutTwoEntriesInTxJob implements IgniteCallable<Integer> { + /** */ + @GridToStringExclude + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Integer call() throws IgniteCheckedException { + assertNotNull(ignite); + + ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + retries(ignite, true); + + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PutTwoEntriesInTxJob.class, this); + } + } + + /** + * Test job putting data to cache. + */ + protected class PutOneEntryInTxJob implements IgniteCallable<Integer> { + /** */ + @GridToStringExclude + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Integer call() throws IgniteCheckedException { + assertNotNull(ignite); + + ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + retries(ignite, false); + + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PutOneEntryInTxJob.class, this); + } + } + + /** + * Test job removing data from cache using query. + */ + protected class RemoveInTxJobQueried implements IgniteCallable<Integer> { + /** */ + @GridToStringExclude + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Integer call() throws IgniteCheckedException { + assertNotNull(ignite); + + ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + removeRetriesQueried(ignite, true); + + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RemoveInTxJobQueried.class, this); + } + } + + /** + * Test job removing data from cache. + */ + protected class RemoveInTxJobSimple implements IgniteCallable<Integer> { + /** */ + @GridToStringExclude + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Integer call() throws IgniteCheckedException { + assertNotNull(ignite); + + ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + removeRetriesSimple(ignite, true); + + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RemoveInTxJobSimple.class, this); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java new file mode 100644 index 0000000..af541c7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Tests for local transactions. + */ +@SuppressWarnings( {"BusyWait"}) +public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstractTest { + /** + * @return Thread count. + */ + protected abstract int threadCount(); + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If check failed. + */ + protected void checkCommitMultithreaded(final IgniteTxConcurrency concurrency, + final IgniteTxIsolation isolation) throws Exception { + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + Thread t = Thread.currentThread(); + + t.setName(t.getName() + "-id-" + t.getId()); + + info("Starting commit thread: " + Thread.currentThread().getName()); + + try { + checkCommit(concurrency, isolation); + } + finally { + info("Finished commit thread: " + Thread.currentThread().getName()); + } + + return null; + } + }, threadCount(), concurrency + "-" + isolation); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If check failed. + */ + protected void checkRollbackMultithreaded(final IgniteTxConcurrency concurrency, + final IgniteTxIsolation isolation) throws Exception { + final ConcurrentMap<Integer, String> map = new ConcurrentHashMap<>(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + Thread t = Thread.currentThread(); + + t.setName(t.getName() + "-id-" + t.getId()); + + info("Starting rollback thread: " + Thread.currentThread().getName()); + + try { + checkRollback(map, concurrency, isolation); + + return null; + } + finally { + info("Finished rollback thread: " + Thread.currentThread().getName()); + } + } + }, threadCount(), concurrency + "-" + isolation); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticReadCommittedCommitMultithreaded() throws Exception { + checkCommitMultithreaded(PESSIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticRepeatableReadCommitMultithreaded() throws Exception { + checkCommitMultithreaded(PESSIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticSerializableCommitMultithreaded() throws Exception { + checkCommitMultithreaded(PESSIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticReadCommittedCommitMultithreaded() throws Exception { + checkCommitMultithreaded(OPTIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticRepeatableReadCommitMultithreaded() throws Exception { + checkCommitMultithreaded(OPTIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticSerializableCommitMultithreaded() throws Exception { + checkCommitMultithreaded(OPTIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticReadCommittedRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(PESSIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticRepeatableReadRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(PESSIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticSerializableRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(PESSIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticReadCommittedRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(OPTIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticRepeatableReadRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(OPTIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticSerializableRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(OPTIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws Exception If failed. + */ + // TODO: GG-8063, enabled when fixed. + public void _testOptimisticSerializableConsistency() throws Exception { + final GridCache<Integer, Long> cache = grid(0).cache(null); + + final int THREADS = 2; + + final int ITERATIONS = 100; + + final int key = 0; + + cache.put(key, 0L); + + List<IgniteFuture<Collection<Long>>> futs = new ArrayList<>(THREADS); + + for (int i = 0; i < THREADS; i++) { + futs.add(GridTestUtils.runAsync(new Callable<Collection<Long>>() { + @Override public Collection<Long> call() throws Exception { + Collection<Long> res = new ArrayList<>(); + + for (int i = 0; i < ITERATIONS; i++) { + while (true) { + try (IgniteTx tx = cache.txStart(OPTIMISTIC, SERIALIZABLE)) { + long val = cache.get(key); + + cache.put(key, val + 1); + + tx.commit(); + + assertTrue(res.add(val + 1)); + + break; + } + catch(IgniteTxOptimisticException e) { + log.info("Got error, will retry: " + e); + } + } + } + + return res; + } + })); + } + + List<Collection<Long>> cols = new ArrayList<>(THREADS); + + for (IgniteFuture<Collection<Long>> fut : futs) { + Collection<Long> col = fut.get(); + + assertEquals(ITERATIONS, col.size()); + + cols.add(col); + } + + Set<Long> duplicates = new HashSet<>(); + + for (Collection<Long> col1 : cols) { + for (Long val1 : col1) { + for (Collection<Long> col2 : cols) { + if (col1 == col2) + continue; + + for (Long val2 : col2) { + if (val1.equals(val2)) { + duplicates.add(val2); + + break; + } + } + } + } + } + + assertTrue("Found duplicated values: " + duplicates, duplicates.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java new file mode 100644 index 0000000..3e0561c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Tests reentry in pessimistic repeatable read tx. + */ +public abstract class IgniteTxReentryAbstractSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** @return Cache mode. */ + protected abstract GridCacheMode cacheMode(); + + /** @return Near enabled. */ + protected abstract boolean nearEnabled(); + + /** @return Grid count. */ + protected abstract int gridCount(); + + /** @return Test key. */ + protected abstract int testKey(); + + /** @return Expected number of near lock requests. */ + protected abstract int expectedNearLockRequests(); + + /** @return Expected number of near lock requests. */ + protected abstract int expectedDhtLockRequests(); + + /** @return Expected number of near lock requests. */ + protected abstract int expectedDistributedLockRequests(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setCommunicationSpi(new CountingCommunicationSpi()); + cfg.setDiscoverySpi(discoSpi); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(cacheMode()); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** @throws Exception If failed. */ + public void testLockReentry() throws Exception { + startGrids(gridCount()); + + try { + GridCache<Object, Object> cache = grid(0).cache(null); + + // Find test key. + int key = testKey(); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + // One near lock request. + cache.get(key); + + // No more requests. + cache.remove(key); + + tx.commit(); + } + + CountingCommunicationSpi commSpi = (CountingCommunicationSpi)grid(0).configuration().getCommunicationSpi(); + + assertEquals(expectedNearLockRequests(), commSpi.nearLocks()); + assertEquals(expectedDhtLockRequests(), commSpi.dhtLocks()); + assertEquals(expectedDistributedLockRequests(), commSpi.distributedLocks()); + } + finally { + stopAllGrids(); + } + } + + /** Counting communication SPI. */ + protected static class CountingCommunicationSpi extends TcpCommunicationSpi { + /** Distributed lock requests. */ + private AtomicInteger distLocks = new AtomicInteger(); + + /** Near lock requests. */ + private AtomicInteger nearLocks = new AtomicInteger(); + + /** Dht locks. */ + private AtomicInteger dhtLocks = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) + throws IgniteSpiException { + countMsg((GridIoMessage)msg); + + super.sendMessage(node, msg); + } + + /** + * Unmarshals the message and increments counters. + * + * @param msg Message to check. + */ + private void countMsg(GridIoMessage msg) { + Object origMsg = msg.message(); + + if (origMsg instanceof GridDistributedLockRequest) { + distLocks.incrementAndGet(); + + if (origMsg instanceof GridNearLockRequest) + nearLocks.incrementAndGet(); + else if (origMsg instanceof GridDhtLockRequest) + dhtLocks.incrementAndGet(); + } + } + + /** @return Number of recorded distributed locks. */ + public int distributedLocks() { + return distLocks.get(); + } + + /** @return Number of recorded distributed locks. */ + public int nearLocks() { + return nearLocks.get(); + } + + /** @return Number of recorded distributed locks. */ + public int dhtLocks() { + return dhtLocks.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxSingleThreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxSingleThreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxSingleThreadedAbstractTest.java new file mode 100644 index 0000000..d26edc3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxSingleThreadedAbstractTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Tests for local transactions. + */ +@SuppressWarnings( {"BusyWait"}) +public abstract class IgniteTxSingleThreadedAbstractTest extends IgniteTxAbstractTest { + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticReadCommittedCommit() throws Exception { + checkCommit(PESSIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticRepeatableReadCommit() throws Exception { + checkCommit(PESSIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticSerializableCommit() throws Exception { + checkCommit(PESSIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticReadCommittedCommit() throws Exception { + checkCommit(OPTIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticRepeatableReadCommit() throws Exception { + checkCommit(OPTIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticSerializableCommit() throws Exception { + checkCommit(OPTIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticReadCommittedRollback() throws Exception { + checkRollback(PESSIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticRepeatableReadRollback() throws Exception { + checkRollback(PESSIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticSerializableRollback() throws Exception { + checkRollback(PESSIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticReadCommittedRollback() throws Exception { + checkRollback(OPTIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticRepeatableReadRollback() throws Exception { + checkRollback(OPTIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticSerializableRollback() throws Exception { + checkRollback(OPTIMISTIC, SERIALIZABLE); + + finalChecks(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java new file mode 100644 index 0000000..43eed3a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests that transaction is invalidated in case of {@link IgniteTxHeuristicException}. + */ +public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAbstractSelfTest { + /** Index SPI throwing exception. */ + private static TestStore store = new TestStore(); + + /** */ + private static final int PRIMARY = 0; + + /** */ + private static final int BACKUP = 1; + + /** */ + private static final int NOT_PRIMARY_AND_BACKUP = 2; + + /** */ + private static Integer lastKey; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.getTransactionsConfiguration().setTxSerializableEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setLoadPreviousValue(true); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + lastKey = 0; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + store.forceFail(false); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutNear() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkPut(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutPrimary() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), PRIMARY)); + + checkPut(false, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testPutBackup() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), BACKUP)); + + checkPut(false, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutAll() throws Exception { + checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + if (gridCount() > 1) { + checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + + checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoveNear() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkRemove(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testRemovePrimary() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY)); + + checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveBackup() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), BACKUP)); + + checkRemove(true, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformNear() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkTransform(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformPrimary() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY)); + + checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformBackup() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), BACKUP)); + + checkTransform(true, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutNearTx() throws Exception { + for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { + for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutPrimaryTx() throws Exception { + for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { + for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutBackupTx() throws Exception { + for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { + for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutMultipleKeysTx() throws Exception { + for (IgniteTxConcurrency concurrency : IgniteTxConcurrency.values()) { + for (IgniteTxIsolation isolation : IgniteTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + if (gridCount() > 1) { + checkPutTx(true, concurrency, isolation, + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + } + } + } + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param keys Keys. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void checkPutTx(boolean putBefore, IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, final Integer... keys) throws Exception { + assertTrue(keys.length > 0); + + info("Test transaction [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + GridCache<Integer, Integer> cache = grid(0).cache(null); + + if (putBefore) { + store.forceFail(false); + + info("Start transaction."); + + try (IgniteTx tx = cache.txStart(concurrency, isolation)) { + for (Integer key : keys) { + info("Put " + key); + + cache.put(key, 1); + } + + info("Commit."); + + tx.commit(); + } + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + grid(i).cache(null).get(key); + } + + store.forceFail(true); + + try { + info("Start transaction."); + + try (IgniteTx tx = cache.txStart(concurrency, isolation)) { + for (Integer key : keys) { + info("Put " + key); + + cache.put(key, 2); + } + + info("Commit."); + + tx.commit(); + } + + fail("Transaction should fail."); + } + catch (IgniteCheckedException e) { + log.info("Expected exception: " + e); + } + + for (Integer key : keys) + checkValue(key, putBefore); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkValue(final Integer key, boolean putBefore) throws Exception { + store.forceFail(false); + + info("Check key: " + key); + + for (int i = 0; i < gridCount(); i++) { + GridKernal grid = (GridKernal) grid(i); + + GridCacheAdapter cache = grid.internalCache(null); + + GridCacheMapEntry entry = cache.map().getEntry(key); + + log.info("Entry: " + entry); + + if (entry != null) { + assertFalse("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', entry.lockedByAny()); + assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore, + entry.hasValue()); + assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null, + entry.rawGetOrUnmarshal(false)); + } + + if (cache.isNear()) { + entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(key); + + log.info("Dht entry: " + entry); + + if (entry != null) { + assertFalse("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', entry.lockedByAny()); + assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore, + entry.hasValue()); + assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null, + entry.rawGetOrUnmarshal(false)); + } + } + } + + for (int i = 0; i < gridCount(); i++) + assertEquals("Unexpected value for grid " + i, putBefore ? 1 : null, grid(i).cache(null).get(key)); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkPut(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + store.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + store.forceFail(true); + + info("Going to put: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).put(key, 2); + + return null; + } + }, IgniteTxRollbackException.class, null); + + checkValue(key, putBefore); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkTransform(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + store.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + store.forceFail(true); + + info("Going to transform: " + key); + + Throwable e = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).<Integer, Integer>jcache(null).invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + e.setValue(2); + + return null; + } + }); + + return null; + } + }, CacheException.class, null); + + assertTrue("Unexpected cause: " + e, e.getCause() instanceof IgniteTxRollbackException); + + checkValue(key, putBefore); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param keys Keys. + * @throws Exception If failed. + */ + private void checkPutAll(boolean putBefore, Integer ... keys) throws Exception { + assert keys.length > 1; + + if (putBefore) { + store.forceFail(false); + + Map<Integer, Integer> m = new HashMap<>(); + + for (Integer key : keys) + m.put(key, 1); + + info("Put data: " + m); + + grid(0).cache(null).putAll(m); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + grid(i).cache(null).get(key); + } + + store.forceFail(true); + + final Map<Integer, Integer> m = new HashMap<>(); + + for (Integer key : keys) + m.put(key, 2); + + info("Going to putAll: " + m); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).putAll(m); + + return null; + } + }, IgniteTxRollbackException.class, null); + + for (Integer key : m.keySet()) + checkValue(key, putBefore); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkRemove(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + store.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + store.forceFail(true); + + info("Going to remove: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).remove(key); + + return null; + } + }, IgniteTxRollbackException.class, null); + + checkValue(key, putBefore); + } + + /** + * Generates key of a given type for given node. + * + * @param node Node. + * @param type Key type. + * @return Key. + */ + private Integer keyForNode(ClusterNode node, int type) { + GridCache<Integer, Integer> cache = grid(0).cache(null); + + if (cache.configuration().getCacheMode() == LOCAL) + return ++lastKey; + + if (cache.configuration().getCacheMode() == REPLICATED && type == NOT_PRIMARY_AND_BACKUP) + return ++lastKey; + + for (int key = lastKey + 1; key < (lastKey + 10_000); key++) { + switch (type) { + case NOT_PRIMARY_AND_BACKUP: { + if (!cache.affinity().isPrimaryOrBackup(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + case PRIMARY: { + if (cache.affinity().isPrimary(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + case BACKUP: { + if (cache.affinity().isBackup(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + default: + fail(); + } + } + + throw new IllegalStateException("Failed to find key."); + } + + /** + * + */ + private static class TestStore extends CacheStore<Object, Object> { + /** Fail flag. */ + private volatile boolean fail; + + /** + * @param fail Fail flag. + */ + public void forceFail(boolean fail) { + this.fail = fail; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return null; + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) { + if (fail) + throw new CacheLoaderException("Store exception"); + } + + /** {@inheritDoc} */ + @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) { + if (fail) + throw new CacheWriterException("Store exception"); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) { + if (fail) + throw new CacheWriterException("Store exception"); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + if (fail) + throw new CacheWriterException("Store exception"); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { + if (fail) + throw new CacheWriterException("Store exception"); + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) { + if (fail && commit) + throw new CacheWriterException("Store exception"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 5e870eb..bb490f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -20,10 +20,10 @@ package org.apache.ignite.internal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java index 9f2e3a7..74db117 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java @@ -25,7 +25,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java index d6f57d3..d5a8b44 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java @@ -24,7 +24,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java index 96076a7..5a53191 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java @@ -25,7 +25,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java index 57b96b6..e7f2d51 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java @@ -24,7 +24,6 @@ import org.apache.ignite.cache.store.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java index d0c0eb6..61c4ff2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.spi.swapspace.file.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java index 55c3957..b4ab685 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java @@ -21,7 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.consistenthash.*; import org.apache.ignite.cache.store.*; -import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.*; import java.util.concurrent.atomic.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java index 9c95345..078c61e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.cache.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.testframework.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetIterationPreloadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetIterationPreloadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetIterationPreloadingSelfTest.java index 628bea5..d7b84b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetIterationPreloadingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEntrySetIterationPreloadingSelfTest.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.cache.*; -import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java index 0ae38f4..a1e8949 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java @@ -21,10 +21,10 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*;