# ignite-1.3.3-p3 added test for cross cache transaction operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/05fda0cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/05fda0cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/05fda0cc Branch: refs/heads/ignite-1258 Commit: 05fda0cceda69ef569b554facca7c2397dbaaad4 Parents: ef14950 Author: sboikov <sboi...@gridgain.com> Authored: Thu Aug 20 13:08:32 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Aug 20 13:08:32 2015 +0300 ---------------------------------------------------------------------- .../cache/CrossCacheTxRandomOperationsTest.java | 490 +++++++++++++++++++ ...gniteCachePutRetryTransactionalSelfTest.java | 15 +- 2 files changed, 500 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05fda0cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java new file mode 100644 index 0000000..f3159a3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.fair.*; +import org.apache.ignite.cache.affinity.rendezvous.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE1 = "cache1"; + + /** */ + private static final String CACHE2 = "cache2"; + + /** */ + private static final int GRID_CNT = 5; + + /** */ + private static final int KEY_RANGE = 1000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + if (gridName.equals(getTestGridName(GRID_CNT - 1))) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperations() throws Exception { + txOperations(PARTITIONED, FULL_SYNC, false); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsPrimarySync() throws Exception { + txOperations(PARTITIONED, PRIMARY_SYNC, false); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsFairAffinity() throws Exception { + txOperations(PARTITIONED, FULL_SYNC, true); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsReplicated() throws Exception { + txOperations(REPLICATED, FULL_SYNC, false); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsReplicatedPrimarySync() throws Exception { + txOperations(REPLICATED, PRIMARY_SYNC, false); + } + + /** + * @param name Cache name. + * @param cacheMode Cache mode. + * @param writeSync Write synchronization mode. + * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, + CacheMode cacheMode, + CacheWriteSynchronizationMode writeSync, + boolean fairAff) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(writeSync); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(1); + + ccfg.setAffinity(fairAff ? new FairAffinityFunction() : new RendezvousAffinityFunction()); + + return ccfg; + } + + /** + * @param cacheMode Cache mode. + * @param writeSync Write synchronization mode. + * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}. + * @throws Exception If failed. + */ + private void txOperations(CacheMode cacheMode, + CacheWriteSynchronizationMode writeSync, + boolean fairAff) throws Exception { + Ignite ignite = ignite(0); + + try { + ignite.createCache(cacheConfiguration(CACHE1, cacheMode, writeSync, fairAff)); + ignite.createCache(cacheConfiguration(CACHE2, cacheMode, writeSync, fairAff)); + + txOperations(PESSIMISTIC, REPEATABLE_READ, false); + txOperations(PESSIMISTIC, REPEATABLE_READ, true); + + txOperations(OPTIMISTIC, REPEATABLE_READ, false); + txOperations(OPTIMISTIC, REPEATABLE_READ, true); + } + finally { + ignite.destroyCache(CACHE1); + ignite.destroyCache(CACHE2); + } + } + + /** + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param client If {@code true} uses client node. + */ + private void txOperations(TransactionConcurrency concurrency, + TransactionIsolation isolation, + boolean client) { + Map<TestKey, TestValue> expData1 = new HashMap<>(); + Map<TestKey, TestValue> expData2 = new HashMap<>(); + + Ignite ignite = client ? ignite(GRID_CNT - 1) : ignite(0); + + assertEquals(client, (boolean)ignite.configuration().isClientMode()); + + IgniteCache<TestKey, TestValue> cache1 = ignite.cache(CACHE1); + IgniteCache<TestKey, TestValue> cache2 = ignite.cache(CACHE2); + + assertNotNull(cache1); + assertNotNull(cache2); + assertNotSame(cache1, cache2); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long seed = System.currentTimeMillis(); + + log.info("Test tx operations [concurrency=" + concurrency + + ", isolation=" + isolation + + ", client=" + client + + ", seed=" + seed + ']'); + + IgniteTransactions txs = ignite.transactions(); + + List<TestKey> keys = new ArrayList<>(); + + for (int i = 0; i < KEY_RANGE; i++) + keys.add(new TestKey(i)); + + for (int i = 0; i < 10_000; i++) { + if (i % 100 == 0) + log.info("Iteration: " + i); + + boolean rollback = i % 10 == 0; + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + cacheOperation(expData1, rnd, cache1, rollback); + cacheOperation(expData2, rnd, cache2, rollback); + + if (rollback) + tx.rollback(); + else + tx.commit(); + } + } + + List<IgniteCache<TestKey, TestValue>> caches1 = new ArrayList<>(); + List<IgniteCache<TestKey, TestValue>> caches2 = new ArrayList<>(); + + for (int i = 0; i < GRID_CNT; i++) { + caches1.add(ignite(i).<TestKey, TestValue>cache(CACHE1)); + caches2.add(ignite(i).<TestKey, TestValue>cache(CACHE2)); + } + + checkData(caches1, keys, expData1); + checkData(caches2, keys, expData2); + + cache1.removeAll(); + cache2.removeAll(); + } + + /** + * @param caches Caches. + * @param keys Keys. + * @param expData Expected data. + */ + private void checkData(List<IgniteCache<TestKey, TestValue>> caches, + List<TestKey> keys, Map<TestKey, TestValue> expData) { + for (IgniteCache<TestKey, TestValue> cache : caches) { + for (TestKey key : keys) { + TestValue val = cache.get(key); + TestValue expVal = expData.get(key); + + assertEquals(expVal, val); + } + } + } + + /** + * @param expData Expected cache data. + * @param rnd Random. + * @param cache Cache. + * @param willRollback {@code True} if will rollback transaction. + */ + private void cacheOperation( + Map<TestKey, TestValue> expData, + ThreadLocalRandom rnd, + IgniteCache<TestKey, TestValue> cache, + boolean willRollback) { + TestKey key = key(rnd); + TestValue val = new TestValue(rnd.nextLong()); + + switch (rnd.nextInt(8)) { + case 0: { + cache.put(key, val); + + if (!willRollback) + expData.put(key, val); + + break; + } + + case 1: { + TestValue oldVal = cache.getAndPut(key, val); + + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + if (!willRollback) + expData.put(key, val); + + break; + } + + case 2: { + boolean rmv = cache.remove(key); + + assertEquals(expData.containsKey(key), rmv); + + if (!willRollback) + expData.remove(key); + + break; + } + + case 3: { + TestValue oldVal = cache.getAndRemove(key); + + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + if (!willRollback) + expData.remove(key); + + break; + } + + case 4: { + boolean put = cache.putIfAbsent(key, val); + + boolean expPut = !expData.containsKey(key); + + assertEquals(expPut, put); + + if (expPut && !willRollback) + expData.put(key, val); + + break; + } + + case 5: { + TestValue oldVal = cache.invoke(key, new TestEntryProcessor(val.value())); + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + if (!willRollback) + expData.put(key, val); + + break; + } + + case 6: { + TestValue oldVal = cache.invoke(key, new TestEntryProcessor(null)); + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + break; + } + + case 7: { + TestValue oldVal = cache.get(key); + TestValue expOld = expData.get(key); + + assertEquals(expOld, oldVal); + + break; + } + + default: + assert false; + } + } + + /** + * @param rnd Random. + * @return Key. + */ + private TestKey key(ThreadLocalRandom rnd) { + return new TestKey(rnd.nextInt(KEY_RANGE)); + } + + /** + * + */ + private static class TestKey implements Serializable { + /** */ + private long key; + + /** + * @param key Key. + */ + public TestKey(long key) { + this.key = key; + } + + /** + * @return Key. + */ + public long key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey other = (TestKey)o; + + return key == other.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(key ^ (key >>> 32)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestKey.class, this); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private long val; + + /** + * @param val Value. + */ + public TestValue(long val) { + this.val = val; + } + + /** + * @return Value. + */ + public long value() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue other = (TestValue)o; + + return val == other.val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * + */ + private static class TestEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, TestValue> { + /** */ + private Long val; + + /** + * @param val Value. + */ + public TestEntryProcessor(@Nullable Long val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public TestValue process(MutableEntry<TestKey, TestValue> e, Object... args) { + TestValue old = e.getValue(); + + if (val != null) + e.setValue(new TestValue(val)); + + return old; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/05fda0cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 9a6bb31..9c4446d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -61,8 +61,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr IgniteAtomicLong atomic = ignite(0).atomicLong("TestAtomic", 0, true); IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override - public Object call() throws Exception { + @Override public Object call() throws Exception { while (!finished.get()) { stopGrid(3); @@ -157,7 +156,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr * @param ignite Ignite instance. * @param clo Closure. * @return Result of closure execution. - * @throws Exception + * @throws Exception If failed. */ private <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception { while (true) { @@ -213,10 +212,16 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public Void call() throws Exception { - ((IgniteCache<String, String>)cache).put("key-" + base + "-" + i, "value-" + i); + String key1 = "key-" + base + "-" + i; + String key2 = "key-" + base; + + assert key1.compareTo(key2) > 0; + + ((IgniteCache<String, String>)cache).put(key1, "value-" + i); - ((IgniteCache<String, Set<String>>)cache).invoke("key-" + base, new AddEntryProcessor("value-" + i)); + ((IgniteCache<String, Set<String>>)cache).invoke(key2, new AddEntryProcessor("value-" + i)); return null; }