IGNITE-45 - Fixed load from store on transactional-lock get
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1a8a51df Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1a8a51df Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1a8a51df Branch: refs/heads/ignite-release-test-no-mod Commit: 1a8a51df6db6fed8198b0d5c4ada1030a634363f Parents: a97444a Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon Mar 23 18:39:30 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon Mar 23 18:39:30 2015 -0700 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLockFuture.java | 51 ++++++- .../dht/GridDhtTransactionalCacheAdapter.java | 3 +- .../GridCacheTxLoadFromStoreOnLockSelfTest.java | 152 +++++++++++++++++++ .../GridCacheMarshallingNodeJoinSelfTest.java | 18 +-- .../ignite/testsuites/IgniteCacheTestSuite.java | 3 + 5 files changed, 214 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index c830ddf..949a11c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.processors.dr.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; @@ -681,6 +682,9 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo if (tx != null) cctx.tm().txContext(tx); + if (err.get() == null) + loadMissingFromStore(); + if (super.onDone(success, err.get())) { if (log.isDebugEnabled()) log.debug("Completing future: " + this); @@ -886,10 +890,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo * @param req Request. * @param e Entry. * @return Entry. - * @throws IgniteCheckedException If failed. */ - private GridDhtCacheEntry addOwned(GridDhtLockRequest req, GridDhtCacheEntry e) - throws IgniteCheckedException { + private GridDhtCacheEntry addOwned(GridDhtLockRequest req, GridDhtCacheEntry e) { while (true) { try { GridCacheMvccCandidate added = e.candidate(lockVer); @@ -924,6 +926,49 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo } /** + * + */ + private void loadMissingFromStore() { + if (cctx.loadPreviousValue() && cctx.readThrough()) { + final Map<KeyCacheObject, GridDhtCacheEntry> loadMap = new LinkedHashMap<>(); + + final GridCacheVersion ver = version(); + + for (GridDhtCacheEntry entry : entries) { + if (!entry.hasValue()) + loadMap.put(entry.key(), entry); + } + + try { + cctx.store().loadAllFromStore( + tx, + loadMap.keySet(), + new CI2<KeyCacheObject, Object>() { + @Override public void apply(KeyCacheObject key, Object val) { + GridDhtCacheEntry entry0 = loadMap.get(key); + + try { + CacheObject val0 = cctx.toCacheObject(val); + + entry0.initialValue(val0, ver, 0, 0, false, topVer, GridDrType.DR_LOAD); + } + catch (GridCacheEntryRemovedException e) { + assert false : "Should not get removed exception while holding lock on entry " + + "[entry=" + entry0 + ", e=" + e + ']'; + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + } + + /** * Lock request timeout object. */ private class LockTimeoutObject extends GridTimeoutObjectAdapter { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 9fce883..5d4c236 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -48,6 +48,7 @@ import static org.apache.ignite.transactions.TransactionState.*; /** * Base class for transactional DHT caches. */ +@SuppressWarnings("unchecked") public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCacheAdapter<K, V> { /** */ private static final long serialVersionUID = 0L; @@ -994,7 +995,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (ret) val = e.innerGet(tx, /*swap*/true, - /*read-through*/ctx.loadPreviousValue(), + /*read-through*/false, /*fail-fast.*/false, /*unmarshal*/false, /*update-metrics*/true, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java new file mode 100644 index 0000000..7b01f0f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/GridCacheTxLoadFromStoreOnLockSelfTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import java.io.*; + +/** + * + */ +public class GridCacheTxLoadFromStoreOnLockSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.getTransactionConfiguration().setTxSerializableEnabled(true); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(4); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testLoadedValueOneBackup() throws Exception { + checkLoadedValue(1); + } + + /** + * @throws Exception If failed. + */ + public void testLoadedValueNoBackups() throws Exception { + checkLoadedValue(0); + } + + /** + * @throws Exception If failed. + */ + private void checkLoadedValue(int backups) throws Exception { + CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); + + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheCfg.setCacheStoreFactory(new StoreFactory()); + cacheCfg.setReadThrough(true); + cacheCfg.setBackups(backups); + cacheCfg.setLoadPreviousValue(true); + + try (IgniteCache<Integer, Integer> cache = ignite(0).createCache(cacheCfg)) { + for (int i = 0; i < 10; i++) + assertEquals((Integer)i, cache.get(i)); + + cache.removeAll(); + + assertEquals(0, cache.size()); + + for (TransactionConcurrency conc : TransactionConcurrency.values()) { + for (TransactionIsolation iso : TransactionIsolation.values()) { + info("Checking transaction [conc=" + conc + ", iso=" + iso + ']'); + + try (Transaction tx = ignite(0).transactions().txStart(conc, iso)) { + for (int i = 0; i < 10; i++) + assertEquals("Invalid value for transaction [conc=" + conc + ", iso=" + iso + ']', + (Integer)i, cache.get(i)); + + tx.commit(); + } + + cache.removeAll(); + assertEquals(0, cache.size()); + } + } + } + } + + /** + * + */ + private static class StoreFactory implements Factory<CacheStore<? super Integer, ? super Integer>> { + /** {@inheritDoc} */ + @Override public CacheStore<? super Integer, ? super Integer> create() { + return new Store(); + } + } + + /** + * + */ + private static class Store extends CacheStoreAdapter<Integer, Integer> implements Serializable { + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + return key; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> e) + throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java index debfce6..efb9bf1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallingNodeJoinSelfTest.java @@ -22,8 +22,8 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -100,7 +100,7 @@ public class GridCacheMarshallingNodeJoinSelfTest extends GridCommonAbstractTest }, EventType.EVT_NODE_JOINED); } - multithreadedAsync(new Callable<Object>() { + IgniteInternalFuture<?> oneMoreGrid = multithreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { allowJoin.await(); @@ -110,21 +110,23 @@ public class GridCacheMarshallingNodeJoinSelfTest extends GridCommonAbstractTest } }, 1); - try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { - IgniteCache<Integer, TestObject> cache = ignite(0).jcache(null); + IgniteCache<Integer, TestObject> cache = ignite(0).jcache(null); + try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.get(0); allowJoin.countDown(); joined.await(); -// Thread.sleep(1000); - - cache.get(1); + assertNotNull(cache.get(1)); tx.commit(); } + + oneMoreGrid.get(); + + assertNotNull(cache.get(1)); } /** @@ -141,8 +143,6 @@ public class GridCacheMarshallingNodeJoinSelfTest extends GridCommonAbstractTest private static class Store extends CacheStoreAdapter<Integer, TestObject> implements Serializable { /** {@inheritDoc} */ @Override public TestObject load(Integer key) throws CacheLoaderException { - U.dumpStack("key=" + key); - return key > 0 ? new TestObject() : null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a8a51df/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index c699e79..cecc281 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -22,6 +22,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.affinity.fair.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.context.*; import org.apache.ignite.internal.processors.cache.distributed.*; @@ -410,6 +411,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class); suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class); + suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class); + return suite; } }