ignite-656: skip store supported for transactional put operation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6105ed3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6105ed3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6105ed3c Branch: refs/heads/ignite-gg-9702 Commit: 6105ed3c841919fc216108a4d4e0047ace207ff9 Parents: b291e5f Author: Denis Magda <dma...@gridgain.com> Authored: Tue Apr 14 14:17:48 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Tue Apr 14 14:17:48 2015 +0300 ---------------------------------------------------------------------- .../distributed/GridDistributedLockRequest.java | 87 +++++++++++++------- .../distributed/dht/GridDhtLockFuture.java | 3 +- .../distributed/dht/GridDhtLockRequest.java | 6 +- .../dht/GridDhtTransactionalCacheAdapter.java | 3 +- .../cache/distributed/dht/GridDhtTxRemote.java | 7 +- .../distributed/near/GridNearLockRequest.java | 2 +- .../near/GridNearTransactionalCache.java | 7 +- .../distributed/near/GridNearTxRemote.java | 7 +- .../cache/transactions/IgniteTxEntry.java | 70 ++++++++++++---- .../transactions/IgniteTxLocalAdapter.java | 5 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 63 ++++++++++++-- 11 files changed, 201 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index 37b34e7..87532e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -88,6 +88,12 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { private boolean partLock; /** + * Arrays with flags. Each flag corresponds to key from keys list. + * Bit 1 in a flag holds skipStore value. + */ + private byte[] flags; + + /** * Empty constructor. */ public GridDistributedLockRequest() { @@ -149,6 +155,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { this.partLock = partLock; retVals = new boolean[keyCnt]; + flags = new byte[keyCnt]; } /** @@ -218,6 +225,12 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { } /** + * @param idx Key index. + * @return Skip store flag. + */ + public boolean skipStore(int idx) { return (flags[idx] & 0x1) == 1; } + + /** * @return Transaction isolation or <tt>null</tt> if not in transaction. */ public TransactionIsolation isolation() { @@ -238,13 +251,15 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { * @param retVal Flag indicating whether value should be returned. * @param cands Candidates. * @param ctx Context. + * @param skipStore Skip store for the key. * @throws IgniteCheckedException If failed. */ public void addKeyBytes( KeyCacheObject key, boolean retVal, @Nullable Collection<GridCacheMvccCandidate> cands, - GridCacheContext ctx + GridCacheContext ctx, + boolean skipStore ) throws IgniteCheckedException { if (keys == null) keys = new ArrayList<>(keysCount()); @@ -255,6 +270,8 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { retVals[idx] = retVal; + flags[idx] = skipStore ? (byte)(flags[idx] | 0x1) : (byte)(flags[idx] & 0xFE); + idx++; } @@ -334,84 +351,90 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { switch (writer.state()) { case 8: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeByteArray("flags", flags)) return false; writer.incrementState(); case 9: - if (!writer.writeMessage("grpLockKey", grpLockKey)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 10: - if (!writer.writeBoolean("isInTx", isInTx)) + if (!writer.writeMessage("grpLockKey", grpLockKey)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("isInvalidate", isInvalidate)) + if (!writer.writeBoolean("isInTx", isInTx)) return false; writer.incrementState(); case 12: - if (!writer.writeBoolean("isRead", isRead)) + if (!writer.writeBoolean("isInvalidate", isInvalidate)) return false; writer.incrementState(); case 13: - if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) + if (!writer.writeBoolean("isRead", isRead)) return false; writer.incrementState(); case 14: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) + if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) return false; writer.incrementState(); case 15: - if (!writer.writeMessage("nearXidVer", nearXidVer)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 16: - if (!writer.writeUuid("nodeId", nodeId)) + if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); case 17: - if (!writer.writeBoolean("partLock", partLock)) + if (!writer.writeUuid("nodeId", nodeId)) return false; writer.incrementState(); case 18: - if (!writer.writeBooleanArray("retVals", retVals)) + if (!writer.writeBoolean("partLock", partLock)) return false; writer.incrementState(); case 19: - if (!writer.writeLong("threadId", threadId)) + if (!writer.writeBooleanArray("retVals", retVals)) return false; writer.incrementState(); case 20: - if (!writer.writeLong("timeout", timeout)) + if (!writer.writeLong("threadId", threadId)) return false; writer.incrementState(); case 21: + if (!writer.writeLong("timeout", timeout)) + return false; + + writer.incrementState(); + + case 22: if (!writer.writeInt("txSize", txSize)) return false; @@ -434,7 +457,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { switch (reader.state()) { case 8: - futId = reader.readIgniteUuid("futId"); + flags = reader.readByteArray("flags"); if (!reader.isLastRead()) return false; @@ -442,7 +465,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 9: - grpLockKey = reader.readMessage("grpLockKey"); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -450,7 +473,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 10: - isInTx = reader.readBoolean("isInTx"); + grpLockKey = reader.readMessage("grpLockKey"); if (!reader.isLastRead()) return false; @@ -458,7 +481,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 11: - isInvalidate = reader.readBoolean("isInvalidate"); + isInTx = reader.readBoolean("isInTx"); if (!reader.isLastRead()) return false; @@ -466,7 +489,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 12: - isRead = reader.readBoolean("isRead"); + isInvalidate = reader.readBoolean("isInvalidate"); if (!reader.isLastRead()) return false; @@ -474,6 +497,14 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); case 13: + isRead = reader.readBoolean("isRead"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: byte isolationOrd; isolationOrd = reader.readByte("isolation"); @@ -485,7 +516,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 14: + case 15: keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -493,7 +524,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 15: + case 16: nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) @@ -501,7 +532,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 16: + case 17: nodeId = reader.readUuid("nodeId"); if (!reader.isLastRead()) @@ -509,7 +540,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 17: + case 18: partLock = reader.readBoolean("partLock"); if (!reader.isLastRead()) @@ -517,7 +548,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 18: + case 19: retVals = reader.readBooleanArray("retVals"); if (!reader.isLastRead()) @@ -525,7 +556,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 19: + case 20: threadId = reader.readLong("threadId"); if (!reader.isLastRead()) @@ -533,7 +564,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 20: + case 21: timeout = reader.readLong("timeout"); if (!reader.isLastRead()) @@ -541,7 +572,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { reader.incrementState(); - case 21: + case 22: txSize = reader.readInt("txSize"); if (!reader.isLastRead()) @@ -561,7 +592,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 22; + return 23; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 82e7f83..1d9b415 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -867,7 +867,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo req.addDhtKey( e.key(), invalidateRdr, - cctx); + cctx, + e.context().skipStore()); if (needVal) { // Mark last added key as needed to be preloaded. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 0574f17..5d77d5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -217,16 +217,18 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { * @param key Key. * @param invalidateEntry Flag indicating whether node should attempt to invalidate reader. * @param ctx Context. + * @param skipStore Skip store for the key. * @throws IgniteCheckedException If failed. */ public void addDhtKey( KeyCacheObject key, boolean invalidateEntry, - GridCacheContext ctx + GridCacheContext ctx, + boolean skipStore ) throws IgniteCheckedException { invalidateEntries.set(idx, invalidateEntry); - addKeyBytes(key, false, null, ctx); + addKeyBytes(key, false, null, ctx, skipStore); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 5e43f0f..e481c75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -220,7 +220,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach txKey, null, null, - req.accessTtl()); + req.accessTtl(), + req.skipStore(i)); if (req.groupLock()) tx.groupLockKey(txKey); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index ffbc0a2..2673ed5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -286,13 +286,15 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param val Value. * @param entryProcessors Entry processors. * @param ttl TTL. + * @param skipStore Skip store flag. */ public void addWrite(GridCacheContext cacheCtx, GridCacheOperation op, IgniteTxKey key, @Nullable CacheObject val, @Nullable Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessors, - long ttl) { + long ttl, + boolean skipStore) { checkInternal(key); if (isSystemInvalidate()) @@ -307,7 +309,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { ttl, -1L, cached, - null); + null, + skipStore); txEntry.entryProcessors(entryProcessors); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index 696cea4..3bbc45c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -292,7 +292,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { dhtVers[idx] = dhtVer; // Delegate to super. - addKeyBytes(key, retVal, (Collection<GridCacheMvccCandidate>)null, ctx); + addKeyBytes(key, retVal, (Collection<GridCacheMvccCandidate>)null, ctx, false); //TODO: revisit } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index bfd0177..26183fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -311,7 +311,12 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> "(transaction has been completed): " + req.version()); } - tx.addEntry(ctx, txKey, GridCacheOperation.NOOP, /*Value.*/null, /*dr version*/null); + tx.addEntry(ctx, + txKey, + GridCacheOperation.NOOP, + null /*Value.*/, + null /*dr version*/, + req.skipStore(i)); } // Add remote candidate before reordering. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 8c597e2..b6b6017 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -326,6 +326,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { * @param key Key to add to read set. * @param val Value. * @param drVer Data center replication version. + * @param skipStore Skip store flag. * @throws IgniteCheckedException If failed. * @return {@code True} if entry has been enlisted. */ @@ -334,7 +335,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { IgniteTxKey key, GridCacheOperation op, CacheObject val, - @Nullable GridCacheVersion drVer + @Nullable GridCacheVersion drVer, + boolean skipStore ) throws IgniteCheckedException { checkInternal(key); @@ -366,7 +368,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { -1L, -1L, cached, - drVer); + drVer, + skipStore); writeMap.put(key, txEntry); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 95d3527..6e8d3cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -124,7 +124,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { @GridDirectTransient private AtomicBoolean prepared = new AtomicBoolean(); - /** Lock flag for colocated cache. */ + /** Lock flag for collocated cache. */ @GridDirectTransient private transient boolean locked; @@ -151,6 +151,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private byte[] expiryPlcBytes; /** + * Additional flags. + * Bit 1 - for skipStore flag value. + */ + private byte flags; + + /** * Required by {@link Externalizable} */ public IgniteTxEntry() { @@ -168,6 +174,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @param conflictExpireTime DR expire time. * @param entry Cache entry. * @param conflictVer Data center replication version. + * @param skipStore Skip store flag. */ public IgniteTxEntry(GridCacheContext<?, ?> ctx, IgniteInternalTx tx, @@ -176,7 +183,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { long ttl, long conflictExpireTime, GridCacheEntryEx entry, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + boolean skipStore) { assert ctx != null; assert tx != null; assert op != null; @@ -190,12 +198,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { this.conflictExpireTime = conflictExpireTime; this.conflictVer = conflictVer; + skipStore(skipStore); + key = entry.key(); cacheId = entry.context().cacheId(); } - /** + /** * This constructor is meant for local transactions. * * @param ctx Cache registry. @@ -208,6 +218,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @param entry Cache entry. * @param filters Put filters. * @param conflictVer Data center replication version. + * @param skipStore Skip store flag. */ public IgniteTxEntry(GridCacheContext<?, ?> ctx, IgniteInternalTx tx, @@ -218,7 +229,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { long ttl, GridCacheEntryEx entry, CacheEntryPredicate[] filters, - GridCacheVersion conflictVer) { + GridCacheVersion conflictVer, + boolean skipStore) { assert ctx != null; assert tx != null; assert op != null; @@ -232,6 +244,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { this.filters = filters; this.conflictVer = conflictVer; + skipStore(skipStore); + if (entryProcessor != null) addEntryProcessor(entryProcessor, invokeArgs); @@ -404,6 +418,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** + * Sets skip store flag value. + * + * @param skipStore Skip store flag. + */ + private void skipStore(boolean skipStore){ + flags = skipStore ? (byte)(flags | 0x1) : (byte)(flags & 0xFE); + } + + /** + * @return Skip store flag. + */ + public boolean skipStore() { return (flags & 0x1) == 1; }; + + /** * @return Tx key. */ public IgniteTxKey txKey() { @@ -813,30 +841,36 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); case 6: - if (!writer.writeBoolean("grpLock", grpLock)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 7: - if (!writer.writeMessage("key", key)) + if (!writer.writeBoolean("grpLock", grpLock)) return false; writer.incrementState(); case 8: - if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) + if (!writer.writeMessage("key", key)) return false; writer.incrementState(); case 9: - if (!writer.writeLong("ttl", ttl)) + if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) return false; writer.incrementState(); case 10: + if (!writer.writeLong("ttl", ttl)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeMessage("val", val)) return false; @@ -902,9 +936,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { return false; reader.incrementState(); - + case 6: - grpLock = reader.readBoolean("grpLock"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -912,7 +946,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 7: - key = reader.readMessage("key"); + grpLock = reader.readBoolean("grpLock"); if (!reader.isLastRead()) return false; @@ -920,7 +954,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 8: - transformClosBytes = reader.readByteArray("transformClosBytes"); + key = reader.readMessage("key"); if (!reader.isLastRead()) return false; @@ -928,7 +962,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 9: - ttl = reader.readLong("ttl"); + transformClosBytes = reader.readByteArray("transformClosBytes"); if (!reader.isLastRead()) return false; @@ -936,6 +970,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 10: + ttl = reader.readLong("ttl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: val = reader.readMessage("val"); if (!reader.isLastRead()) @@ -955,7 +997,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 29eabf3..509a96c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -506,7 +506,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter boolean skipNear = near() && store.isWriteToStoreFromDht(); for (IgniteTxEntry e : writeEntries) { - if (skipNear && e.cached().isNear()) + if ((skipNear && e.cached().isNear()) || e.skipStore()) continue; boolean intercept = e.context().config().getInterceptor() != null; @@ -3229,7 +3229,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter hasDrTtl ? drTtl : -1L, entry, filter, - drVer); + drVer, + entry.context().skipStore()); txEntry.conflictExpireTime(drExpireTime); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6105ed3c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 9b0d675..a133268 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -4267,9 +4267,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testWithSkipStore() throws Exception { - //if (gridCount() > 1) // TODO IGNITE-656 (test primary/backup/near keys with multiple nodes). - // return; - IgniteCache<String, Integer> cache = grid(0).cache(null); IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore(); @@ -4380,7 +4377,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertTrue(map.containsKey(key)); } - cache.removeAll(); + //cache.removeAll(); //TODO: doesn't work in transactional mode + cache.removeAll(data.keySet()); for (String key : keys) { assertNull(cacheSkipStore.get(key)); @@ -4388,7 +4386,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertFalse(map.containsKey(key)); } - // Final checks + assertTrue(map.size() == 0); + + // Miscellaneous checks String newKey = "New key"; @@ -4417,6 +4417,59 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertNull(cacheSkipStore.get(rmvKey)); assertTrue(map.containsKey(rmvKey)); + + assertTrue(cache.size(CachePeekMode.ALL) == 0); + assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0); + + cache.remove(rmvKey); + + assertTrue(map.size() == 0); + + if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL) { + checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, TransactionConcurrency.OPTIMISTIC, + TransactionIsolation.READ_COMMITTED); + } + } + + private void checkSkipStoreWithTransaction(IgniteCache<String, Integer> cache, + IgniteCache<String, Integer> cacheSkipStore, + Map<String, Integer> data, + List<String> keys, + TransactionConcurrency txConcurrency, + TransactionIsolation txIsolation) throws Exception { + + cache.removeAll(); + + assertTrue(cache.size(CachePeekMode.ALL) == 0); + assertTrue(cacheSkipStore.size(CachePeekMode.ALL) == 0); + assertTrue(map.size() == 0); + + IgniteTransactions txs = grid(0).transactions(); + + Transaction tx = txs.txStart(txConcurrency, txIsolation); + + for (int i = 0; i < keys.size(); i++) + cacheSkipStore.put(keys.get(i), i); + + for (String key: keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + tx.commit(); + + tx = txs.txStart(txConcurrency, txIsolation); + + cacheSkipStore.putAll(data); + + for (String key: keys) { + assertNotNull(cacheSkipStore.get(key)); + assertNotNull(cache.get(key)); + assertFalse(map.containsKey(key)); + } + + tx.commit(); } /**