ignite-656: fixed for atomic cache single updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e914bc8e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e914bc8e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e914bc8e Branch: refs/heads/ignite-286 Commit: e914bc8ef2410f6b42147e346adfcd9c233d64e8 Parents: fb7ea4c Author: Denis Magda <dma...@gridgain.com> Authored: Mon Apr 13 13:22:40 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Mon Apr 13 13:22:40 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 3 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 6 ++- .../dht/atomic/GridNearAtomicUpdateRequest.java | 47 +++++++++++++++----- .../cache/GridCacheAbstractFullApiSelfTest.java | 4 +- 4 files changed, 44 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e914bc8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 026d2c5..18a1c25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1115,6 +1115,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheReturn retVal = null; + //TODO: support skipStore for putAll if (keys.size() > 1 && // Several keys ... writeThrough() && // and store is enabled ... !ctx.store().isLocal() && // and this is not local store ... @@ -1703,7 +1704,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op, writeVal, req.invokeArguments(), - primary && writeThrough(), + primary && writeThrough() && !req.skipStore(), req.returnValue(), expiry, true, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e914bc8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ac4ae2c2..547d156 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -585,7 +585,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> invokeArgs, filter, subjId, - taskNameHash); + taskNameHash, + cctx.skipStore()); req.addUpdateEntry(cacheKey, val, @@ -707,7 +708,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> invokeArgs, filter, subjId, - taskNameHash); + taskNameHash, + cctx.skipStore()); pendingMappings.put(nodeId, mapped); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e914bc8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 3f68a46..e5229bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -126,6 +126,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Task name hash. */ private int taskNameHash; + /** Skip write-through to a persistent storage. */ + private boolean skipStore; + /** * Empty constructor required by {@link Externalizable}. */ @@ -150,6 +153,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri * @param filter Optional filter for atomic check. * @param subjId Subject ID. * @param taskNameHash Task name hash code. + * @param skipStore Skip write-through to a persistent storage. */ public GridNearAtomicUpdateRequest( int cacheId, @@ -165,7 +169,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @Nullable Object[] invokeArgs, @Nullable CacheEntryPredicate[] filter, @Nullable UUID subjId, - int taskNameHash + int taskNameHash, + boolean skipStore ) { this.cacheId = cacheId; this.nodeId = nodeId; @@ -182,6 +187,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.filter = filter; this.subjId = subjId; this.taskNameHash = taskNameHash; + this.skipStore = skipStore; keys = new ArrayList<>(); } @@ -276,6 +282,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return Skip write-through to a persistent storage. + */ + public boolean skipStore() {return skipStore;} + + /** * @param key Key to add. * @param val Optional update value. * @param conflictTtl Conflict TTL (optional). @@ -627,36 +638,42 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 16: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 17: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 18: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 19: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 21: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 22: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -787,7 +804,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 16: - subjId = reader.readUuid("subjId"); + skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) return false; @@ -795,6 +812,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 17: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -806,7 +831,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 18: + case 19: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -814,7 +839,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 19: + case 20: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -822,7 +847,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 20: + case 21: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -830,7 +855,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 21: + case 22: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e914bc8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 30c2515..d0309c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -4267,8 +4267,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testWithSkipStore() throws Exception { - if (gridCount() > 1) // TODO IGNITE-656 (test primary/backup/near keys with multiple nodes). - return; + //if (gridCount() > 1) // TODO IGNITE-656 (test primary/backup/near keys with multiple nodes). + // return; IgniteCache<String, Integer> cache = grid(0).cache(null);