IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7c73fc5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7c73fc5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7c73fc5d Branch: refs/heads/ignite-104 Commit: 7c73fc5d8d81f3cded6bffd4dcf3d1e48ad84d64 Parents: e5c69b8 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Jul 29 17:26:44 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Jul 29 17:26:44 2015 -0700 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 9 ++------- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 3 --- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 19 ++----------------- .../distributed/near/GridNearAtomicCache.java | 4 ++-- 4 files changed, 6 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 02e48df..31606b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1767,7 +1767,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), - op == TRANSFORM ? req.entryProcessor(i) : null, updRes.newTtl(), updRes.conflictExpireTime()); } @@ -2034,13 +2033,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { - EntryProcessor<Object, Object, Object> entryProcessor = - entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); - if (!batchRes.readersOnly()) dhtFut.addWriteEntry(entry, writeVal, - entryProcessor, + entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()), updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE, null); @@ -2049,7 +2045,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtFut.addNearWriteEntries(filteredReaders, entry, writeVal, - entryProcessor, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); } @@ -2465,7 +2460,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*event*/true, /*metrics*/true, /*primary*/false, - /*check version*/!req.forceTransformBackups(), + /*check version*/op != TRANSFORM || !req.forceTransformBackups(), req.topologyVersion(), CU.empty0(), replicate ? DR_BACKUP : DR_NONE, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 15ec121..ab0c2e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -270,14 +270,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> * @param readers Entry readers. * @param entry Entry. * @param val Value. - * @param entryProcessor Entry processor.. * @param ttl TTL for near cache update (optional). * @param expireTime Expire time for near cache update (optional). */ public void addNearWriteEntries(Iterable<UUID> readers, GridDhtCacheEntry entry, @Nullable CacheObject val, - EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long expireTime) { CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); @@ -323,7 +321,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> updateReq.addNearWriteValue(entry.key(), val, - entryProcessor, ttl, expireTime); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 7149dec..6340c93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -267,36 +267,21 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param key Key to add. * @param val Value, {@code null} if should be removed. - * @param entryProcessor Entry processor. * @param ttl TTL. * @param expireTime Expire time. */ public void addNearWriteValue(KeyCacheObject key, @Nullable CacheObject val, - EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long expireTime) { if (nearKeys == null) { nearKeys = new ArrayList<>(); - - if (forceTransformBackups) { - nearEntryProcessors = new ArrayList<>(); - nearEntryProcessorsBytes = new ArrayList<>(); - } - else - nearVals = new ArrayList<>(); + nearVals = new ArrayList<>(); } nearKeys.add(key); - - if (forceTransformBackups) { - assert entryProcessor != null; - - nearEntryProcessors.add(entryProcessor); - } - else - nearVals.add(val); + nearVals.add(val); if (ttl >= 0) { if (nearTtls == null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 0aa1638..707facc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -225,7 +225,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*write-through*/false, /*read-through*/false, /*retval*/false, - /**expiry policy*/null, + /*expiry policy*/null, /*event*/true, /*metrics*/true, /*primary*/false, @@ -336,7 +336,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*event*/true, /*metrics*/true, /*primary*/false, - /*check version*/!req.forceTransformBackups(), + /*check version*/op != TRANSFORM || !req.forceTransformBackups(), req.topologyVersion(), CU.empty0(), DR_NONE,