IGNITE-45 - Fixed load from store on transactional-lock get

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1a8a51df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1a8a51df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1a8a51df

Branch: refs/heads/ignite-45-gridcache
Commit: 1a8a51df6db6fed8198b0d5c4ada1030a634363f
Parents: a97444a
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Mon Mar 23 18:39:30 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Mon Mar 23 18:39:30 2015 -0700

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLockFuture.java      |  51 ++++++-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   3 +-
 .../GridCacheTxLoadFromStoreOnLockSelfTest.java | 152 +++++++++++++++++++
 .../GridCacheMarshallingNodeJoinSelfTest.java   |  18 +--
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 +
 5 files changed, 214 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c830ddf..949a11c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.dr.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
@@ -681,6 +682,9 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
         if (tx != null)
             cctx.tm().txContext(tx);
 
+        if (err.get() == null)
+            loadMissingFromStore();
+
         if (super.onDone(success, err.get())) {
             if (log.isDebugEnabled())
                 log.debug("Completing future: " + this);
@@ -886,10 +890,8 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
      * @param req Request.
      * @param e Entry.
      * @return Entry.
-     * @throws IgniteCheckedException If failed.
      */
-    private GridDhtCacheEntry addOwned(GridDhtLockRequest req, 
GridDhtCacheEntry e)
-        throws IgniteCheckedException {
+    private GridDhtCacheEntry addOwned(GridDhtLockRequest req, 
GridDhtCacheEntry e) {
         while (true) {
             try {
                 GridCacheMvccCandidate added = e.candidate(lockVer);
@@ -924,6 +926,49 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
     }
 
     /**
+     *
+     */
+    private void loadMissingFromStore() {
+        if (cctx.loadPreviousValue() && cctx.readThrough()) {
+            final Map<KeyCacheObject, GridDhtCacheEntry> loadMap = new 
LinkedHashMap<>();
+
+            final GridCacheVersion ver = version();
+
+            for (GridDhtCacheEntry entry : entries) {
+                if (!entry.hasValue())
+                    loadMap.put(entry.key(), entry);
+            }
+
+            try {
+                cctx.store().loadAllFromStore(
+                    tx,
+                    loadMap.keySet(),
+                    new CI2<KeyCacheObject, Object>() {
+                        @Override public void apply(KeyCacheObject key, Object 
val) {
+                            GridDhtCacheEntry entry0 = loadMap.get(key);
+
+                            try {
+                                CacheObject val0 = cctx.toCacheObject(val);
+
+                                entry0.initialValue(val0, ver, 0, 0, false, 
topVer, GridDrType.DR_LOAD);
+                            }
+                            catch (GridCacheEntryRemovedException e) {
+                                assert false : "Should not get removed 
exception while holding lock on entry " +
+                                    "[entry=" + entry0 + ", e=" + e + ']';
+                            }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
+                        }
+                    });
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
+            }
+        }
+    }
+
+    /**
      * Lock request timeout object.
      */
     private class LockTimeoutObject extends GridTimeoutObjectAdapter {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 9fce883..5d4c236 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -48,6 +48,7 @@ import static 
org.apache.ignite.transactions.TransactionState.*;
 /**
  * Base class for transactional DHT caches.
  */
+@SuppressWarnings("unchecked")
 public abstract class GridDhtTransactionalCacheAdapter<K, V> extends 
GridDhtCacheAdapter<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -994,7 +995,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
                                     if (ret)
                                         val = e.innerGet(tx,
                                             /*swap*/true,
-                                            
/*read-through*/ctx.loadPreviousValue(),
+                                            /*read-through*/false,
                                             /*fail-fast.*/false,
                                             /*unmarshal*/false,
                                             /*update-metrics*/true,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
new file mode 100644
index 0000000..7b01f0f
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.io.*;
+
+/**
+ *
+ */
+public class GridCacheTxLoadFromStoreOnLockSelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadedValueOneBackup() throws Exception {
+        checkLoadedValue(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadedValueNoBackups() throws Exception {
+        checkLoadedValue(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkLoadedValue(int backups) throws Exception {
+        CacheConfiguration<Integer, Integer> cacheCfg = new 
CacheConfiguration<>();
+
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg.setCacheStoreFactory(new StoreFactory());
+        cacheCfg.setReadThrough(true);
+        cacheCfg.setBackups(backups);
+        cacheCfg.setLoadPreviousValue(true);
+
+        try (IgniteCache<Integer, Integer> cache = 
ignite(0).createCache(cacheCfg)) {
+            for (int i = 0; i < 10; i++)
+                assertEquals((Integer)i, cache.get(i));
+
+            cache.removeAll();
+
+            assertEquals(0, cache.size());
+
+            for (TransactionConcurrency conc : 
TransactionConcurrency.values()) {
+                for (TransactionIsolation iso : TransactionIsolation.values()) 
{
+                    info("Checking transaction [conc=" + conc + ", iso=" + iso 
+ ']');
+
+                    try (Transaction tx = 
ignite(0).transactions().txStart(conc, iso)) {
+                        for (int i = 0; i < 10; i++)
+                            assertEquals("Invalid value for transaction 
[conc=" + conc + ", iso=" + iso + ']',
+                                (Integer)i, cache.get(i));
+
+                        tx.commit();
+                    }
+
+                    cache.removeAll();
+                    assertEquals(0, cache.size());
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class StoreFactory implements Factory<CacheStore<? super 
Integer, ? super Integer>> {
+        /** {@inheritDoc} */
+        @Override public CacheStore<? super Integer, ? super Integer> create() 
{
+            return new Store();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Store extends CacheStoreAdapter<Integer, Integer> 
implements Serializable {
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer key) throws CacheLoaderException 
{
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends 
Integer> e)
+            throws CacheWriterException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
index debfce6..efb9bf1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java
@@ -22,8 +22,8 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -100,7 +100,7 @@ public class GridCacheMarshallingNodeJoinSelfTest extends 
GridCommonAbstractTest
             }, EventType.EVT_NODE_JOINED);
         }
 
-        multithreadedAsync(new Callable<Object>() {
+        IgniteInternalFuture<?> oneMoreGrid = multithreadedAsync(new 
Callable<Object>() {
             @Override public Object call() throws Exception {
                 allowJoin.await();
 
@@ -110,21 +110,23 @@ public class GridCacheMarshallingNodeJoinSelfTest extends 
GridCommonAbstractTest
             }
         }, 1);
 
-        try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
-            IgniteCache<Integer, TestObject> cache = ignite(0).jcache(null);
+        IgniteCache<Integer, TestObject> cache = ignite(0).jcache(null);
 
+        try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
             cache.get(0);
 
             allowJoin.countDown();
 
             joined.await();
 
-//            Thread.sleep(1000);
-
-            cache.get(1);
+            assertNotNull(cache.get(1));
 
             tx.commit();
         }
+
+        oneMoreGrid.get();
+
+        assertNotNull(cache.get(1));
     }
 
     /**
@@ -141,8 +143,6 @@ public class GridCacheMarshallingNodeJoinSelfTest extends 
GridCommonAbstractTest
     private static class Store extends CacheStoreAdapter<Integer, TestObject> 
implements Serializable {
         /** {@inheritDoc} */
         @Override public TestObject load(Integer key) throws 
CacheLoaderException {
-            U.dumpStack("key=" + key);
-
             return key > 0 ? new TestObject() : null;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index c699e79..cecc281 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.affinity.fair.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.context.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -410,6 +411,8 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class);
         suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class);
 
+        suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
+
         return suite;
     }
 }

Reply via email to