IGNITE-109 - Fixed usage of cache store in cross-cache transactions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/31ce5854 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/31ce5854 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/31ce5854 Branch: refs/heads/ignite-69 Commit: 31ce5854b87adf6bdb462fb08eb9cc6e4dd81465 Parents: c224017 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Jan 27 18:38:17 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Jan 27 18:38:17 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/GridCacheStoreManager.java | 25 +- .../transactions/IgniteTxLocalAdapter.java | 42 ++- .../IgniteCrossCacheTxStoreSelfTest.java | 288 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 1 + 4 files changed, 351 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ce5854/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java index 306a8c1..d92db74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.*; import org.apache.ignite.transactions.*; @@ -784,6 +785,9 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { tx.addMeta(SES_ATTR, ses); } + else + // Session cache name may change in cross-cache transaction. + ses.cacheName(cctx.name()); } else ses = new SessionData(null, cctx.name()); @@ -796,12 +800,14 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { */ private static class SessionData { /** */ + @GridToStringExclude private final IgniteTx tx; /** */ - private final String cacheName; + private String cacheName; /** */ + @GridToStringInclude private Map<Object, Object> props; /** @@ -836,6 +842,18 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { private String cacheName() { return cacheName; } + + /** + * @param cacheName Cache name. + */ + private void cacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SessionData.class, this, "tx", CU.txString(tx)); + } } /** @@ -873,6 +891,11 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { return ses0 != null ? ses0.cacheName() : null; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ThreadLocalSession.class, this); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ce5854/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 0ff9627..b0934f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -472,6 +472,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (writeEntries != null) { Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null; List<K> rmvCol = null; + GridCacheStoreManager<K, V> writeStore = null; boolean skipNear = near() && store.writeToStoreFromDht(); @@ -496,10 +497,24 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (op == CREATE || op == UPDATE) { // Batch-process all removes if needed. if (rmvCol != null && !rmvCol.isEmpty()) { - store.removeAllFromStore(this, rmvCol); + assert writeStore != null; + + writeStore.removeAllFromStore(this, rmvCol); // Reset. rmvCol.clear(); + + writeStore = null; + } + + // Batch-process puts if cache ID has changed. + if (writeStore != null && writeStore != cacheCtx.store() && putMap != null && !putMap.isEmpty()) { + writeStore.putAllToStore(this, putMap); + + // Reset. + putMap.clear(); + + writeStore = null; } if (intercept) { @@ -517,14 +532,29 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); putMap.put(key, F.t(val, ver)); + + writeStore = cacheCtx.store(); } else if (op == DELETE) { // Batch-process all puts if needed. if (putMap != null && !putMap.isEmpty()) { - store.putAllToStore(this, putMap); + assert writeStore != null; + + writeStore.putAllToStore(this, putMap); // Reset. putMap.clear(); + + writeStore = null; + } + + if (writeStore != null && writeStore != cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) { + writeStore.removeAllFromStore(this, rmvCol); + + // Reset. + rmvCol.clear(); + + writeStore = null; } if (intercept) { @@ -541,6 +571,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> rmvCol = new LinkedList<>(); rmvCol.add(key); + + writeStore = cacheCtx.store(); } else if (log.isDebugEnabled()) log.debug("Ignoring NOOP entry for batch store commit: " + e); @@ -548,16 +580,18 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (putMap != null && !putMap.isEmpty()) { assert rmvCol == null || rmvCol.isEmpty(); + assert writeStore != null; // Batch put at the end of transaction. - store.putAllToStore(this, putMap); + writeStore.putAllToStore(this, putMap); } if (rmvCol != null && !rmvCol.isEmpty()) { assert putMap == null || putMap.isEmpty(); + assert writeStore != null; // Batch remove at the end of transaction. - store.removeAllFromStore(this, rmvCol); + writeStore.removeAllFromStore(this, rmvCol); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ce5854/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java new file mode 100644 index 0000000..7df7619 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + TestStore firstStore = new TestStore(); + TestStore secondStore = new TestStore(); + + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cfg1 = cacheConfiguration("cacheA", firstStore); + CacheConfiguration cfg2 = cacheConfiguration("cacheB", firstStore); + + CacheConfiguration cfg3 = cacheConfiguration("cacheC", secondStore); + CacheConfiguration cfg4 = cacheConfiguration("cacheD", null); + + cfg.setCacheConfiguration(cfg1, cfg2, cfg3, cfg4); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + return cfg; + } + + /** + * @param cacheName Cache name. + * @param store Cache store. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String cacheName, CacheStore<Object, Object> store) { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + cfg.setName(cacheName); + + cfg.setBackups(1); + if (store != null) { + cfg.setCacheStoreFactory( + new FactoryBuilder.SingletonFactory<CacheStore<? super Object, ? super Object>>(store)); + + cfg.setWriteThrough(true); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(4); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + grid(0).cache("cacheA").removeAll(); + grid(0).cache("cacheB").removeAll(); + grid(0).cache("cacheC").removeAll(); + } + + /** + * @throws Exception If failed. + */ + public void testWriteThrough() throws Exception { + GridEx grid = grid(0); + + TestStore firstStore = (TestStore)grid(0).configuration().getCacheConfiguration()[1].getCacheStoreFactory().create(); + + Collection<String> evts = firstStore.events(); + + try (IgniteTx tx = grid.transactions().txStart()) { + GridCache<Object, Object> cacheA = grid.cache("cacheA"); + GridCache<Object, Object> cacheB = grid.cache("cacheB"); + + cacheA.put("1", "1"); + cacheA.put("2", "2"); + cacheB.put("1", "1"); + cacheB.put("2", "2"); + + cacheA.remove("3"); + cacheA.remove("4"); + cacheB.remove("3"); + cacheB.remove("4"); + + cacheA.put("5", "5"); + cacheA.remove("6"); + + cacheB.put("7", "7"); + + tx.commit(); + } + + assertEqualsCollections(F.asList( + "writeAll cacheA 2", + "writeAll cacheB 2", + "deleteAll cacheA 2", + "deleteAll cacheB 2", + "write cacheA", + "delete cacheA", + "write cacheB", + "txEnd true" + ), + evts); + } + + /** + * @throws Exception If failed. + */ + public void testIncompatibleCaches1() throws Exception { + GridEx grid = grid(0); + + try (IgniteTx ignored = grid.transactions().txStart()) { + GridCache<Object, Object> cacheA = grid.cache("cacheA"); + GridCache<Object, Object> cacheC = grid.cache("cacheC"); + + cacheA.put("1", "2"); + + cacheC.put("1", "2"); + + fail("Must not allow to enlist caches with different stores to one transaction"); + } + catch (IgniteCheckedException e) { + assertTrue(e.getMessage().startsWith("Failed to enlist new cache to existing transaction")); + } + } + + /** + * @throws Exception If failed. + */ + public void testIncompatibleCaches2() throws Exception { + GridEx grid = grid(0); + + try (IgniteTx ignored = grid.transactions().txStart()) { + GridCache<Object, Object> cacheA = grid.cache("cacheA"); + GridCache<Object, Object> cacheC = grid.cache("cacheD"); + + cacheA.put("1", "2"); + + cacheC.put("1", "2"); + + fail("Must not allow to enlist caches with different stores to one transaction"); + } + catch (IgniteCheckedException e) { + assertTrue(e.getMessage().startsWith("Failed to enlist new cache to existing transaction")); + } + } + + /** + * @param col1 Collection 1. + * @param col2 Collection 2. + */ + private static void assertEqualsCollections(Collection<?> col1, Collection<?> col2) { + if (col1.size() != col2.size()) + fail("Collections are not equal:\nExpected:\t" + col1 + "\nActual:\t" + col2); + + Iterator<?> it1 = col1.iterator(); + Iterator<?> it2 = col2.iterator(); + + int idx = 0; + + while (it1.hasNext()) { + Object item1 = it1.next(); + Object item2 = it2.next(); + + if (!F.eq(item1, item2)) + fail("Collections are not equal (position " + idx + "):\nExpected: " + col1 + "\nActual: " + col2); + + idx++; + } + } + + /** + * + */ + private static class TestStore extends CacheStore<Object, Object> { + /** */ + private Queue<String> evts = new ConcurrentLinkedDeque<>(); + + /** + * + */ + public void clear() { + evts.clear(); + } + + /** + * @return Collection of recorded events. + */ + public Collection<String> events() { + return evts; + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) + throws CacheLoaderException { + } + + @Override public void txEnd(boolean commit) throws CacheWriterException { + evts.offer("txEnd " + commit); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException { + CacheStoreSession ses = session(); + + String cacheName = ses.cacheName(); + + evts.add("write " + cacheName); + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException { + String cacheName = session().cacheName(); + + evts.add("writeAll " + cacheName + " " + entries.size()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + String cacheName = session().cacheName(); + + evts.add("delete " + cacheName); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { + String cacheName = session().cacheName(); + + evts.add("deleteAll " + cacheName + " " + keys.size()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ce5854/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index c49d113..78a212f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -64,6 +64,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheTxInvokeTest.class); suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class); suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class); + suite.addTestSuite(IgniteCrossCacheTxStoreSelfTest.class); // Affinity tests. suite.addTestSuite(GridCachePartitionFairAffinityNodesSelfTest.class);