IGNITE-109 - Fixed usage of cache store in cross-cache transactions.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/31ce5854
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/31ce5854
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/31ce5854

Branch: refs/heads/ignite-69
Commit: 31ce5854b87adf6bdb462fb08eb9cc6e4dd81465
Parents: c224017
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Tue Jan 27 18:38:17 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Tue Jan 27 18:38:17 2015 -0800

----------------------------------------------------------------------
 .../processors/cache/GridCacheStoreManager.java |  25 +-
 .../transactions/IgniteTxLocalAdapter.java      |  42 ++-
 .../IgniteCrossCacheTxStoreSelfTest.java        | 288 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   1 +
 4 files changed, 351 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ce5854/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
index 306a8c1..d92db74 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.transactions.*;
@@ -784,6 +785,9 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
 
                 tx.addMeta(SES_ATTR, ses);
             }
+            else
+                // Session cache name may change in cross-cache transaction.
+                ses.cacheName(cctx.name());
         }
         else
             ses = new SessionData(null, cctx.name());
@@ -796,12 +800,14 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
      */
     private static class SessionData {
         /** */
+        @GridToStringExclude
         private final IgniteTx tx;
 
         /** */
-        private final String cacheName;
+        private String cacheName;
 
         /** */
+        @GridToStringInclude
         private Map<Object, Object> props;
 
         /**
@@ -836,6 +842,18 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
         private String cacheName() {
             return cacheName;
         }
+
+        /**
+         * @param cacheName Cache name.
+         */
+        private void cacheName(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SessionData.class, this, "tx", CU.txString(tx));
+        }
     }
 
     /**
@@ -873,6 +891,11 @@ public class GridCacheStoreManager<K, V> extends 
GridCacheManagerAdapter<K, V> {
 
             return ses0 != null ? ses0.cacheName() : null;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ThreadLocalSession.class, this);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ce5854/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 0ff9627..b0934f1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -472,6 +472,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends 
IgniteTxAdapter<K, V>
                 if (writeEntries != null) {
                     Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
                     List<K> rmvCol = null;
+                    GridCacheStoreManager<K, V> writeStore = null;
 
                     boolean skipNear = near() && store.writeToStoreFromDht();
 
@@ -496,10 +497,24 @@ public abstract class IgniteTxLocalAdapter<K, V> extends 
IgniteTxAdapter<K, V>
                         if (op == CREATE || op == UPDATE) {
                             // Batch-process all removes if needed.
                             if (rmvCol != null && !rmvCol.isEmpty()) {
-                                store.removeAllFromStore(this, rmvCol);
+                                assert writeStore != null;
+
+                                writeStore.removeAllFromStore(this, rmvCol);
 
                                 // Reset.
                                 rmvCol.clear();
+
+                                writeStore = null;
+                            }
+
+                            // Batch-process puts if cache ID has changed.
+                            if (writeStore != null && writeStore != 
cacheCtx.store() && putMap != null && !putMap.isEmpty()) {
+                                writeStore.putAllToStore(this, putMap);
+
+                                // Reset.
+                                putMap.clear();
+
+                                writeStore = null;
                             }
 
                             if (intercept) {
@@ -517,14 +532,29 @@ public abstract class IgniteTxLocalAdapter<K, V> extends 
IgniteTxAdapter<K, V>
                                 putMap = new 
LinkedHashMap<>(writeMap().size(), 1.0f);
 
                             putMap.put(key, F.t(val, ver));
+
+                            writeStore = cacheCtx.store();
                         }
                         else if (op == DELETE) {
                             // Batch-process all puts if needed.
                             if (putMap != null && !putMap.isEmpty()) {
-                                store.putAllToStore(this, putMap);
+                                assert writeStore != null;
+
+                                writeStore.putAllToStore(this, putMap);
 
                                 // Reset.
                                 putMap.clear();
+
+                                writeStore = null;
+                            }
+
+                            if (writeStore != null && writeStore != 
cacheCtx.store() && rmvCol != null && !rmvCol.isEmpty()) {
+                                writeStore.removeAllFromStore(this, rmvCol);
+
+                                // Reset.
+                                rmvCol.clear();
+
+                                writeStore = null;
                             }
 
                             if (intercept) {
@@ -541,6 +571,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends 
IgniteTxAdapter<K, V>
                                 rmvCol = new LinkedList<>();
 
                             rmvCol.add(key);
+
+                            writeStore = cacheCtx.store();
                         }
                         else if (log.isDebugEnabled())
                             log.debug("Ignoring NOOP entry for batch store 
commit: " + e);
@@ -548,16 +580,18 @@ public abstract class IgniteTxLocalAdapter<K, V> extends 
IgniteTxAdapter<K, V>
 
                     if (putMap != null && !putMap.isEmpty()) {
                         assert rmvCol == null || rmvCol.isEmpty();
+                        assert writeStore != null;
 
                         // Batch put at the end of transaction.
-                        store.putAllToStore(this, putMap);
+                        writeStore.putAllToStore(this, putMap);
                     }
 
                     if (rmvCol != null && !rmvCol.isEmpty()) {
                         assert putMap == null || putMap.isEmpty();
+                        assert writeStore != null;
 
                         // Batch remove at the end of transaction.
-                        store.removeAllFromStore(this, rmvCol);
+                        writeStore.removeAllFromStore(this, rmvCol);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ce5854/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
new file mode 100644
index 0000000..7df7619
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        TestStore firstStore = new TestStore();
+        TestStore secondStore = new TestStore();
+
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration cfg1 = cacheConfiguration("cacheA", firstStore);
+        CacheConfiguration cfg2 = cacheConfiguration("cacheB", firstStore);
+
+        CacheConfiguration cfg3 = cacheConfiguration("cacheC", secondStore);
+        CacheConfiguration cfg4 = cacheConfiguration("cacheD", null);
+
+        cfg.setCacheConfiguration(cfg1, cfg2, cfg3, cfg4);
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        return cfg;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param store Cache store.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String cacheName, 
CacheStore<Object, Object> store) {
+        CacheConfiguration cfg = defaultCacheConfiguration();
+
+        cfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
+        cfg.setName(cacheName);
+
+        cfg.setBackups(1);
+        if (store != null) {
+            cfg.setCacheStoreFactory(
+                new FactoryBuilder.SingletonFactory<CacheStore<? super Object, 
? super Object>>(store));
+
+            cfg.setWriteThrough(true);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        grid(0).cache("cacheA").removeAll();
+        grid(0).cache("cacheB").removeAll();
+        grid(0).cache("cacheC").removeAll();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWriteThrough() throws Exception {
+        GridEx grid = grid(0);
+
+        TestStore firstStore = 
(TestStore)grid(0).configuration().getCacheConfiguration()[1].getCacheStoreFactory().create();
+
+        Collection<String> evts = firstStore.events();
+
+        try (IgniteTx tx = grid.transactions().txStart()) {
+            GridCache<Object, Object> cacheA = grid.cache("cacheA");
+            GridCache<Object, Object> cacheB = grid.cache("cacheB");
+
+            cacheA.put("1", "1");
+            cacheA.put("2", "2");
+            cacheB.put("1", "1");
+            cacheB.put("2", "2");
+
+            cacheA.remove("3");
+            cacheA.remove("4");
+            cacheB.remove("3");
+            cacheB.remove("4");
+
+            cacheA.put("5", "5");
+            cacheA.remove("6");
+
+            cacheB.put("7", "7");
+
+            tx.commit();
+        }
+
+        assertEqualsCollections(F.asList(
+                "writeAll cacheA 2",
+                "writeAll cacheB 2",
+                "deleteAll cacheA 2",
+                "deleteAll cacheB 2",
+                "write cacheA",
+                "delete cacheA",
+                "write cacheB",
+                "txEnd true"
+            ),
+            evts);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncompatibleCaches1() throws Exception {
+        GridEx grid = grid(0);
+
+        try (IgniteTx ignored = grid.transactions().txStart()) {
+            GridCache<Object, Object> cacheA = grid.cache("cacheA");
+            GridCache<Object, Object> cacheC = grid.cache("cacheC");
+
+            cacheA.put("1", "2");
+
+            cacheC.put("1", "2");
+
+            fail("Must not allow to enlist caches with different stores to one 
transaction");
+        }
+        catch (IgniteCheckedException e) {
+            assertTrue(e.getMessage().startsWith("Failed to enlist new cache 
to existing transaction"));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIncompatibleCaches2() throws Exception {
+        GridEx grid = grid(0);
+
+        try (IgniteTx ignored = grid.transactions().txStart()) {
+            GridCache<Object, Object> cacheA = grid.cache("cacheA");
+            GridCache<Object, Object> cacheC = grid.cache("cacheD");
+
+            cacheA.put("1", "2");
+
+            cacheC.put("1", "2");
+
+            fail("Must not allow to enlist caches with different stores to one 
transaction");
+        }
+        catch (IgniteCheckedException e) {
+            assertTrue(e.getMessage().startsWith("Failed to enlist new cache 
to existing transaction"));
+        }
+    }
+
+    /**
+     * @param col1 Collection 1.
+     * @param col2 Collection 2.
+     */
+    private static void assertEqualsCollections(Collection<?> col1, 
Collection<?> col2) {
+        if (col1.size() != col2.size())
+            fail("Collections are not equal:\nExpected:\t" + col1 + 
"\nActual:\t" + col2);
+
+        Iterator<?> it1 = col1.iterator();
+        Iterator<?> it2 = col2.iterator();
+
+        int idx = 0;
+
+        while (it1.hasNext()) {
+            Object item1 = it1.next();
+            Object item2 = it2.next();
+
+            if (!F.eq(item1, item2))
+                fail("Collections are not equal (position " + idx + 
"):\nExpected: " + col1 + "\nActual:   " + col2);
+
+            idx++;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestStore extends CacheStore<Object, Object> {
+        /** */
+        private Queue<String> evts = new ConcurrentLinkedDeque<>();
+
+        /**
+         *
+         */
+        public void clear() {
+            evts.clear();
+        }
+
+        /**
+         * @return Collection of recorded events.
+         */
+        public Collection<String> events() {
+            return evts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, 
@Nullable Object... args)
+            throws CacheLoaderException {
+        }
+
+        @Override public void txEnd(boolean commit) throws 
CacheWriterException {
+            evts.offer("txEnd " + commit);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object load(Object key) throws CacheLoaderException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws 
CacheLoaderException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) throws 
CacheWriterException {
+            CacheStoreSession ses = session();
+
+            String cacheName = ses.cacheName();
+
+            evts.add("write " + cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) 
throws CacheWriterException {
+            String cacheName = session().cacheName();
+
+            evts.add("writeAll " + cacheName + " " + entries.size());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            String cacheName = session().cacheName();
+
+            evts.add("delete " + cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void deleteAll(Collection<?> keys) throws 
CacheWriterException {
+            String cacheName = session().cacheName();
+
+            evts.add("deleteAll " + cacheName + " " + keys.size());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/31ce5854/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index c49d113..78a212f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -64,6 +64,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheTxInvokeTest.class);
         suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);
         suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class);
+        suite.addTestSuite(IgniteCrossCacheTxStoreSelfTest.class);
 
         // Affinity tests.
         suite.addTestSuite(GridCachePartitionFairAffinityNodesSelfTest.class);

Reply via email to