IGNITE-1265 - Entry processor must always have the correct cache value.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5065a1ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5065a1ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5065a1ec Branch: refs/heads/master Commit: 5065a1eccb3d71b2573d37bb6ff2c78a1bbc107c Parents: ccaa2b2 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Aug 18 19:35:50 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Aug 18 19:35:50 2015 -0700 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 20 +++ .../dht/GridDhtPartitionTopology.java | 7 + .../dht/GridDhtPartitionTopologyImpl.java | 20 +++ .../cache/distributed/dht/GridDhtTxLocal.java | 4 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 136 +++++++++++++++++-- .../cache/transactions/IgniteTxEntry.java | 18 +++ .../IgniteCacheEntryProcessorNodeJoinTest.java | 54 ++++---- .../cache/IgniteCacheInvokeReadThroughTest.java | 2 +- 8 files changed, 223 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index c3f3e7f..531678e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -331,6 +331,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { lock.readLock().lock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index c551fb3..7b08510 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -129,6 +129,13 @@ public interface GridDhtPartitionTopology { public GridDhtPartitionMap localPartitionMap(); /** + * @param nodeId Node ID. + * @param part Partition. + * @return Partition state. + */ + public GridDhtPartitionState partitionState(UUID nodeId, int part); + + /** * @return Current update sequence. */ public long updateSequence(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index de7f876..f356138 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -614,6 +614,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { + lock.readLock().lock(); + + try { + GridDhtPartitionMap partMap = node2part.get(nodeId); + + if (partMap != null) { + GridDhtPartitionState state = partMap.get(part); + + return state == null ? EVICTED : state; + } + + return EVICTED; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 6a72c89..7da6e07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -363,8 +363,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @return Future that will be completed when locks are acquired. */ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync( - @Nullable Iterable<IgniteTxEntry> reads, - @Nullable Iterable<IgniteTxEntry> writes, + @Nullable Collection<IgniteTxEntry> reads, + @Nullable Collection<IgniteTxEntry> writes, Map<IgniteTxKey, GridCacheVersion> verMap, long msgId, IgniteUuid nearMiniId, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 2b7e1bc..ad1023f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -135,6 +135,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** Keys that should be locked. */ private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + /** Force keys future for correct transforms. */ + private IgniteInternalFuture<?> forceKeysFut; + /** Locks ready flag. */ private volatile boolean locksReady; @@ -291,7 +294,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters()); - if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) { + if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) { cached.unswap(retVal); boolean readThrough = (retVal || hasFilters) && @@ -312,7 +315,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter null, null); - if (retVal) { + if (retVal || txEntry.op() == TRANSFORM) { if (!F.isEmpty(txEntry.entryProcessors())) { invoke = true; @@ -339,6 +342,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } } + txEntry.entryProcessorCalculatedValue(val); + if (err != null || procRes != null) ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err); else @@ -362,7 +367,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter ret.success(false); } else - ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue()); + ret.success(txEntry.op() != DELETE || cached.hasValue()); } } catch (IgniteCheckedException e) { @@ -466,7 +471,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter */ private boolean mapIfLocked() { if (checkLocks()) { - prepare0(); + if (!mapped.compareAndSet(false, true)) + return false; + + if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null)) + prepare0(); + else { + forceKeysFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + f.get(); + + prepare0(); + } + catch (IgniteCheckedException e) { + onError(e); + } + } + }); + } return true; } @@ -709,7 +732,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param writes Write entries. * @param txNodes Transaction nodes mapping. */ - public void prepare(Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes, + @SuppressWarnings("TypeMayBeWeakened") + public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes, Map<UUID, Collection<UUID>> txNodes) { if (tx.empty()) { tx.setRollbackOnly(); @@ -721,6 +745,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter this.writes = writes; this.txNodes = txNodes; + if (!F.isEmpty(writes)) { + Map<Integer, Collection<KeyCacheObject>> forceKeys = null; + + for (IgniteTxEntry entry : writes) + forceKeys = checkNeedRebalanceKeys(entry, forceKeys); + + forceKeysFut = forceRebalanceKeys(forceKeys); + } + readyLocks(); mapIfLocked(); @@ -735,12 +768,75 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } /** + * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store + * required key or will create new map if passed in map is {@code null}. * + * @param e TX entry. + * @param map Map with needed preload keys. + * @return Map if it was created. */ - private void prepare0() { - if (!mapped.compareAndSet(false, true)) - return; + private Map<Integer, Collection<KeyCacheObject>> checkNeedRebalanceKeys( + IgniteTxEntry e, + Map<Integer, Collection<KeyCacheObject>> map + ) { + if (retVal || !F.isEmpty(e.entryProcessors())) { + if (map == null) + map = new HashMap<>(); + + Collection<KeyCacheObject> keys = map.get(e.cacheId()); + + if (keys == null) { + keys = new ArrayList<>(); + + map.put(e.cacheId(), keys); + } + + keys.add(e.key()); + } + + return map; + } + + private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) { + if (F.isEmpty(keysMap)) + return null; + + GridCompoundFuture<Object, Object> compFut = null; + IgniteInternalFuture<Object> lastForceFut = null; + for (Map.Entry<Integer, Collection<KeyCacheObject>> entry : keysMap.entrySet()) { + if (lastForceFut != null && compFut == null) { + compFut = new GridCompoundFuture(); + + compFut.add(lastForceFut); + } + + int cacheId = entry.getKey(); + + Collection<KeyCacheObject> keys = entry.getValue(); + + lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion()); + + if (compFut != null) + compFut.add(lastForceFut); + } + + if (compFut != null) { + compFut.markInitialized(); + + return compFut; + } + else { + assert lastForceFut != null; + + return lastForceFut; + } + } + + /** + * + */ + private void prepare0() { try { // We are holding transaction-level locks for entries here, so we can get next write version. onEntriesLocked(); @@ -957,7 +1053,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter private boolean map( IgniteTxEntry entry, Map<UUID, GridDistributedTxMapping> futDhtMap, - Map<UUID, GridDistributedTxMapping> futNearMap) { + Map<UUID, GridDistributedTxMapping> futNearMap + ) { if (entry.cached().isLocal()) return false; @@ -1024,14 +1121,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param locMap Exclude map. * @return {@code True} if mapped. */ - private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes, - Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) { + private boolean map( + IgniteTxEntry entry, + Iterable<ClusterNode> nodes, + Map<UUID, GridDistributedTxMapping> globalMap, + Map<UUID, GridDistributedTxMapping> locMap + ) { boolean ret = false; if (nodes != null) { for (ClusterNode n : nodes) { GridDistributedTxMapping global = globalMap.get(n.id()); + if (!F.isEmpty(entry.entryProcessors())) { + GridDhtPartitionState state = entry.context().topology().partitionState(n.id(), + entry.cached().partition()); + + if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) { + CacheObject procVal = entry.entryProcessorCalculatedValue(); + + entry.op(procVal == null ? DELETE : UPDATE); + entry.value(procVal, true, false); + entry.entryProcessors(null); + } + } + if (global == null) globalMap.put(n.id(), global = new GridDistributedTxMapping(n)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 247d350..7890831 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -79,6 +79,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { @GridDirectTransient private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol; + /** Transient field for calculated entry processor value. */ + @GridDirectTransient + private CacheObject entryProcessorCalcVal; + /** Transform closure bytes. */ @GridToStringExclude private byte[] transformClosBytes; @@ -775,6 +779,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { return expiryPlc; } + /** + * @return Entry processor calculated value. + */ + public CacheObject entryProcessorCalculatedValue() { + return entryProcessorCalcVal; + } + + /** + * @param entryProcessorCalcVal Entry processor calculated value. + */ + public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) { + this.entryProcessorCalcVal = entryProcessorCalcVal; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java index 9c17ebd..94bfd8f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java @@ -122,38 +122,44 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes final AtomicReference<Throwable> error = new AtomicReference<>(); final int started = 6; - IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - try { - for (int i = 0; i < started; i++) { - U.sleep(1_000); - - startGrid(GRID_CNT + i); + try { + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + for (int i = 0; i < started; i++) { + U.sleep(1_000); + + startGrid(GRID_CNT + i); + } + } + catch (Exception e) { + error.compareAndSet(null, e); } } - catch (Exception e) { - error.compareAndSet(null, e); - } - } - }, 1, "starter"); + }, 1, "starter"); - try { - checkIncrement(invokeAll); - } - finally { - stop.set(true); + try { + checkIncrement(invokeAll); + } + finally { + stop.set(true); - fut.get(getTestTimeout()); - } + fut.get(getTestTimeout()); + } - for (int i = 0; i < NUM_SETS; i++) { - for (int g = 0; g < GRID_CNT + started; g++) { - Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i); + for (int i = 0; i < NUM_SETS; i++) { + for (int g = 0; g < GRID_CNT + started; g++) { + Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i); - assertNotNull(vals); - assertEquals(100, vals.size()); + assertNotNull(vals); + assertEquals(100, vals.size()); + } } } + finally { + for (int i = 0; i < started; i++) + stopGrid(GRID_CNT + i); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java index 10ab1ab..b72540d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java @@ -34,7 +34,7 @@ import static org.apache.ignite.cache.CacheMode.*; public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-114"); +// fail("https://issues.apache.org/jira/browse/IGNITE-114"); } /** */