http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 1382064,6f75e32..08e2b94 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@@ -4262,9 -4570,9 +4570,9 @@@ public abstract class GridCacheAdapter< true, op.single(), ctx.system(), - PESSIMISTIC, + OPTIMISTIC, READ_COMMITTED, - ctx.kernalContext().config().getTransactionsConfiguration().getDefaultTxTimeout(), + ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(), ctx.hasFlag(INVALIDATE), !ctx.hasFlag(SKIP_STORE), 0,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index ada0c1c,0c8fa66..bd1f260 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@@ -1213,97 -1221,120 +1210,94 @@@ public abstract class GridCacheMapEntry IgniteBiTuple<Boolean, V> interceptRes = null; - try { - synchronized (this) { - checkObsolete(); - - if (tx != null && tx.groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled()) - groupLockSanityCheck(tx); - else - assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : - "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; - - boolean startVer = isStartVersion(); - - if (startVer) { - if (tx != null && !tx.local() && tx.onePhaseCommit()) - // Must promote to check version for one-phase commit tx. - unswap(true, retval); - else - // Release swap. - releaseSwap(); - } + synchronized (this) { + checkObsolete(); - newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion(); + if (tx != null && tx.groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled()) + groupLockSanityCheck(tx); + else + assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : + "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; - if (tx != null && !tx.local() && tx.onePhaseCommit() && explicitVer == null) { - if (!startVer && ver.compareTo(newVer) > 0) { - if (log.isDebugEnabled()) - log.debug("Skipping entry removal for one-phase commit since current entry version is " + - "greater than write version [entry=" + this + ", newVer=" + newVer + ']'); + boolean startVer = isStartVersion(); - return new GridCacheUpdateTxResult<>(false, null); - } + if (startVer) { + // Release swap. + releaseSwap(); + } - if (!detached()) - enqueueVer = newVer; - } + newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion(); - old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val; + old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val; - if (intercept) { - interceptRes = cctx.config().<K, V>getInterceptor().onBeforeRemove(key, old); + if (intercept) { + interceptRes = cctx.config().<K, V>getInterceptor().onBeforeRemove(key, old); - if (cctx.cancelRemove(interceptRes)) - return new GridCacheUpdateTxResult<>(false, cctx.<V>unwrapTemporary(interceptRes.get2())); - } + if (cctx.cancelRemove(interceptRes)) + return new GridCacheUpdateTxResult<>(false, cctx.<V>unwrapTemporary(interceptRes.get2())); + } - GridCacheValueBytes oldBytes = valueBytesUnlocked(); + GridCacheValueBytes oldBytes = valueBytesUnlocked(); - if (old == null) - old = saveValueForIndexUnlocked(); + if (old == null) + old = saveValueForIndexUnlocked(); - // Clear indexes inside of synchronization since indexes - // can be updated without actually holding entry lock. - clearIndex(old); + // Clear indexes inside of synchronization since indexes + // can be updated without actually holding entry lock. + clearIndex(old); - boolean hadValPtr = valPtr != 0; + boolean hadValPtr = valPtr != 0; - update(null, null, 0, 0, newVer); + update(null, null, 0, 0, newVer); - if (cctx.offheapTiered() && hadValPtr) { - boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + if (cctx.offheapTiered() && hadValPtr) { + boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); - assert rmv; - } + assert rmv; + } - if (cctx.deferredDelete() && !detached() && !isInternal()) { - if (!deletedUnlocked()) { - deletedUnlocked(true); + if (cctx.deferredDelete() && !detached() && !isInternal()) { + if (!deletedUnlocked()) { + deletedUnlocked(true); - if (tx != null) { - GridCacheMvcc<K> mvcc = mvccExtras(); + if (tx != null) { + GridCacheMvcc<K> mvcc = mvccExtras(); - if (mvcc == null || mvcc.isEmpty(tx.xidVersion())) - clearReaders(); - else - clearReader(tx.originatingNodeId()); - } + if (mvcc == null || mvcc.isEmpty(tx.xidVersion())) + clearReaders(); + else + clearReader(tx.originatingNodeId()); } } + } - drReplicate(drType, null, null, newVer); + drReplicate(drType, null, null, newVer); - if (metrics && cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onRemove(); + if (metrics && cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRemove(); - if (tx == null) - obsoleteVer = newVer; - else { - // Only delete entry if the lock is not explicit. - if (tx.groupLock() || lockedBy(tx.xidVersion())) - obsoleteVer = tx.xidVersion(); - else if (log.isDebugEnabled()) - log.debug("Obsolete version was not set because lock was explicit: " + this); - } + if (tx == null) + obsoleteVer = newVer; + else { + // Only delete entry if the lock is not explicit. + if (tx.groupLock() || lockedBy(tx.xidVersion())) + obsoleteVer = tx.xidVersion(); + else if (log.isDebugEnabled()) + log.debug("Obsolete version was not set because lock was explicit: " + this); + } - if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { - V evtOld = cctx.unwrapTemporary(old); + if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + V evtOld = cctx.unwrapTemporary(old); - cctx.events().addEvent(partition(), key, evtNodeId, tx == null ? null : tx.xid(), newVer, - EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hasValueUnlocked(), subjId, - null, taskName); - } + cctx.events().addEvent(partition(), key, evtNodeId, tx == null ? null : tx.xid(), newVer, + EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hasValueUnlocked(), subjId, + null, taskName); + } - CacheMode mode = cctx.config().getCacheMode(); - - if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED || - (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false); + if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) + cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes, false); - cctx.dataStructures().onEntryUpdated(key, true); - } - } - finally { - if (enqueueVer != null) { - assert cctx.deferredDelete(); - - cctx.onDeferredDelete(this, enqueueVer); - } + cctx.dataStructures().onEntryUpdated(key, true); } // Persist outside of synchronization. The correctness of the http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a5d39ab,d7b1914..1d064e2 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -279,12 -281,21 +281,28 @@@ public class GridCachePartitionExchange } /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + super.stop0(cancel); + + exchFuts = null; + } + ++ /** {@inheritDoc} */ + @SuppressWarnings("LockAcquiredButNotSafelyReleased") + @Override protected void stop0(boolean cancel) { + super.stop0(cancel); + + // Do not allow any activity in exchange manager after stop. + busyLock.writeLock().lock(); + + exchFuts = null; + } + + /** + * @param cacheId Cache ID. + * @param exchId Exchange ID. + * @return Topology. + */ public GridDhtPartitionTopology<K, V> clientTopology(int cacheId, GridDhtPartitionExchangeId exchId) { GridClientPartitionTopology<K, V> top = clientTops.get(cacheId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index c37be22,d8cc296..9b12d1f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@@ -20,12 -20,13 +20,12 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; - import org.apache.ignite.internal.processors.cache.version.*; - import org.apache.ignite.lang.*; import org.apache.ignite.internal.processors.cache.transactions.*; - import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; import java.io.*; @@@ -269,7 -367,11 +269,7 @@@ public class GridDistributedTxFinishReq _clone.commit = commit; _clone.syncCommit = syncCommit; _clone.syncRollback = syncRollback; - _clone.baseVer = baseVer; + _clone.baseVer = baseVer != null ? (GridCacheVersion)baseVer.clone() : null; - _clone.writeEntries = writeEntries; - _clone.writeEntriesBytes = writeEntriesBytes; - _clone.recoveryWrites = recoveryWrites; - _clone.recoveryWritesBytes = recoveryWritesBytes; _clone.txSize = txSize; _clone.grpLockKey = grpLockKey; _clone.grpLockKeyBytes = grpLockKeyBytes; @@@ -371,102 -485,110 +383,112 @@@ if (!super.readFrom(buf)) return false; - switch (commState.idx) { + switch (state) { case 8: - GridCacheVersion baseVer0 = commState.getCacheVersion(); + baseVer = reader.readMessage("baseVer"); - if (baseVer0 == CACHE_VER_NOT_READ) + if (!reader.isLastRead()) return false; - baseVer = baseVer0; - - commState.idx++; + state++; case 9: - if (buf.remaining() < 1) - return false; + commit = reader.readBoolean("commit"); - commit = commState.getBoolean(); + if (!reader.isLastRead()) + return false; - commState.idx++; + state++; case 10: - GridCacheVersion commitVer0 = commState.getCacheVersion(); + commitVer = reader.readMessage("commitVer"); - if (commitVer0 == CACHE_VER_NOT_READ) + if (!reader.isLastRead()) return false; - commitVer = commitVer0; - - commState.idx++; + state++; case 11: - IgniteUuid futId0 = commState.getGridUuid(); + futId = reader.readIgniteUuid("futId"); - if (futId0 == GRID_UUID_NOT_READ) + if (!reader.isLastRead()) return false; - futId = futId0; - - commState.idx++; + state++; case 12: - byte[] grpLockKeyBytes0 = commState.getByteArray(); + grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes"); - if (grpLockKeyBytes0 == BYTE_ARR_NOT_READ) + if (!reader.isLastRead()) return false; - grpLockKeyBytes = grpLockKeyBytes0; - - commState.idx++; + state++; case 13: - if (buf.remaining() < 1) - return false; + invalidate = reader.readBoolean("invalidate"); - invalidate = commState.getBoolean(); + if (!reader.isLastRead()) + return false; - commState.idx++; + state++; case 14: - if (buf.remaining() < 1) - return false; + recoveryWritesBytes = reader.readCollection("recoveryWritesBytes", byte[].class); - syncCommit = commState.getBoolean(); + if (!reader.isLastRead()) + return false; - commState.idx++; + state++; case 15: - if (buf.remaining() < 1) - return false; + syncCommit = reader.readBoolean("syncCommit"); - syncRollback = commState.getBoolean(); + if (!reader.isLastRead()) + return false; - commState.idx++; + state++; case 16: + syncRollback = reader.readBoolean("syncRollback"); + if (buf.remaining() < 1) + return false; - sys = commState.getBoolean(); + if (!reader.isLastRead()) + return false; - commState.idx++; + state++; case 17: - if (buf.remaining() < 8) - return false; + sys = reader.readBoolean("sys"); - threadId = commState.getLong(); + if (!reader.isLastRead()) + return false; - commState.idx++; + state++; case 18: - if (buf.remaining() < 4) + threadId = reader.readLong("threadId"); + + if (!reader.isLastRead()) return false; - txSize = commState.getInt(); + state++; + + case 19: + txSize = reader.readInt("txSize"); + + if (!reader.isLastRead()) + return false; + + state++; + + case 20: + writeEntriesBytes = reader.readCollection("writeEntriesBytes", byte[].class); + + if (!reader.isLastRead()) + return false; - commState.idx++; + state++; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index 257a331,7e3da3b..8616472 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@@ -21,10 -21,9 +21,10 @@@ import org.apache.ignite.cluster.* import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.*; + import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; - import org.apache.ignite.internal.util.tostring.*; import org.jetbrains.annotations.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index ff31a72,54d7757..7478b88 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@@ -133,10 -130,9 +133,10 @@@ public class GridDistributedTxPrepareRe * @param grpLockKey Group lock key. * @param partLock {@code True} if preparing group-lock transaction with partition lock. * @param txNodes Transaction nodes mapping. + * @param onePhaseCommit One phase commit flag. */ public GridDistributedTxPrepareRequest( - IgniteTxEx<K, V> tx, + IgniteInternalTx<K, V> tx, @Nullable Collection<IgniteTxEntry<K, V>> reads, Collection<IgniteTxEntry<K, V>> writes, IgniteTxKey grpLockKey, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 9bf7255,4992ba8..8d1ce82 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@@ -505,42 -505,50 +505,45 @@@ public class GridDistributedTxRemoteAda GridCacheVersion explicitVer = txEntry.drVersion(); + if (txEntry.ttl() == CU.TTL_ZERO) + op = DELETE; + - if (finalizationStatus() == FinalizationStatus.RECOVERY_FINISH || optimistic()) { - // Primary node has left the grid so we have to process conflicts on backups. - if (explicitVer == null) - explicitVer = writeVersion(); // Force write version to be used. - boolean drNeedResolve = - cacheCtx.conflictNeedResolve(cached.version(), explicitVer); + boolean drNeedResolve = + cacheCtx.conflictNeedResolve(cached.version(), explicitVer); - if (drNeedResolve) { - IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContextImpl<K, V>> - drRes = conflictResolve(op, txEntry.key(), val, valBytes, - txEntry.ttl(), txEntry.drExpireTime(), explicitVer, cached); + if (drNeedResolve) { + IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>> + drRes = conflictResolve(op, txEntry.key(), val, valBytes, + txEntry.ttl(), txEntry.drExpireTime(), explicitVer, cached); - assert drRes != null; + assert drRes != null; - GridCacheVersionConflictContextImpl<K, V> drCtx = drRes.get2(); + GridCacheVersionConflictContext<K, V> drCtx = drRes.get2(); - if (drCtx.isUseOld()) - op = NOOP; - else if (drCtx.isUseNew()) { - txEntry.ttl(drCtx.ttl()); - - if (drCtx.newEntry().dataCenterId() != cacheCtx.dataCenterId()) - txEntry.drExpireTime(drCtx.expireTime()); - else - txEntry.drExpireTime(-1L); - } - else if (drCtx.isMerge()) { - op = drRes.get1(); - val = drCtx.mergeValue(); - valBytes = null; - explicitVer = writeVersion(); - - txEntry.ttl(drCtx.ttl()); + if (drCtx.isUseOld()) + op = NOOP; + else if (drCtx.isUseNew()) { + txEntry.ttl(drCtx.ttl()); + + if (drCtx.newEntry().dataCenterId() != cacheCtx.dataCenterId()) + txEntry.drExpireTime(drCtx.expireTime()); + else txEntry.drExpireTime(-1L); - } } - else - // Nullify explicit version so that innerSet/innerRemove will work as usual. - explicitVer = null; + else if (drCtx.isMerge()) { + op = drRes.get1(); + val = drCtx.mergeValue(); + valBytes = null; + explicitVer = writeVersion(); + + txEntry.ttl(drCtx.ttl()); + txEntry.drExpireTime(-1L); + } } + else + // Nullify explicit version so that innerSet/innerRemove will work as usual. + explicitVer = null; if (op == CREATE || op == UPDATE) { // Invalidate only for near nodes (backups cannot be invalidated). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index cc36335,b2fcc50..aaa562c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@@ -621,21 -621,14 +620,9 @@@ public class GridDhtCacheEntry<K, V> ex GridCacheMvcc<K> mvcc = mvccExtras(); - GridCacheMvccCandidate<K> cand = mvcc == null ? null : mvcc.candidate(ver); - - if (cand != null) - cand.mappedNodeIds(mappings); - - return cand; + return mvcc == null ? null : mvcc.candidate(ver); } - /** {@inheritDoc} */ - @Override public CacheEntry<K, V> wrap(boolean prjAware) { - GridCacheContext<K, V> nearCtx = cctx.dht().near().context(); - - GridCacheProjectionImpl<K, V> prjPerCall = nearCtx.projectionPerCall(); - - if (prjPerCall != null && prjAware) - return new GridPartitionedCacheEntryImpl<>(prjPerCall, nearCtx, key, this); - - return new GridPartitionedCacheEntryImpl<>(null, nearCtx, key, this); - } - /** * @return Cache name. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 922e644,a93bf7d..793066a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@@ -20,23 -20,25 +20,24 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; + import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; + import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.transactions.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.lang.*; - import org.apache.ignite.transactions.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; import org.jetbrains.annotations.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; import static org.apache.ignite.transactions.IgniteTxState.*; - import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** * @@@ -91,8 -93,10 +92,8 @@@ public final class GridDhtTxFinishFutur * @param commit Commit flag. */ public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter<K, V> tx, boolean commit) { - super(cctx.kernalContext(), F.<IgniteTx>identityReducer(tx)); + super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx)); - assert cctx != null; - this.cctx = cctx; this.tx = tx; this.commit = commit; @@@ -329,8 -334,25 +330,8 @@@ tx.subjectId(), tx.taskNameHash()); - if (!tx.pessimistic()) { - int idx = 0; - - for (IgniteTxEntry<K, V> e : dhtMapping.writes()) - req.ttl(idx++, e.ttl()); - - if (nearMapping != null) { - idx = 0; - - for (IgniteTxEntry<K, V> e : nearMapping.writes()) - req.nearTtl(idx++, e.ttl()); - } - } - - if (tx.onePhaseCommit()) - req.writeVersion(tx.writeVersion()); - try { - cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(n, req, tx.ioPolicy()); if (sync) res = true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 9c39c7c,9bdf4bd..8e39b11 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@@ -17,15 -17,18 +17,16 @@@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import org.apache.ignite.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; + import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; - import org.apache.ignite.internal.processors.cache.transactions.*; - import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; + import org.apache.ignite.plugin.extensions.communication.*; -import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import java.io.*; @@@ -66,10 -81,14 +67,8 @@@ public class GridDhtTxFinishRequest<K, private UUID subjId; /** Task name hash. */ - @GridDirectVersion(2) private int taskNameHash; - /** TTLs for optimistic transaction. */ - private GridLongList ttls; - - /** Near cache TTLs for optimistic transaction. */ - private GridLongList nearTtls; - /** * Empty constructor required for {@link Externalizable}. */ @@@ -247,9 -371,12 +246,13 @@@ _clone.sysInvalidate = sysInvalidate; _clone.topVer = topVer; _clone.pendingVers = pendingVers; + _clone.onePhaseCommit = onePhaseCommit; + _clone.writeVer = writeVer != null ? (GridCacheVersion)writeVer.clone() : null; + _clone.writeVer = writeVer; _clone.subjId = subjId; _clone.taskNameHash = taskNameHash; + _clone.ttls = ttls != null ? (GridLongList)ttls.clone() : null; + _clone.nearTtls = nearTtls != null ? (GridLongList)nearTtls.clone() : null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 4077275,6f5284a..1e2a222 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@@ -363,8 -350,7 +364,8 @@@ public class GridDhtTxLocal<K, V> exten * @param lastBackups IDs of backup nodes receiving last prepare request. * @return Future that will be completed when locks are acquired. */ - public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync( - public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads, ++ public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync( + @Nullable Iterable<IgniteTxEntry<K, V>> reads, @Nullable Iterable<IgniteTxEntry<K, V>> writes, Map<IgniteTxKey<K>, GridCacheVersion> verMap, long msgId, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index ee05fa2,ba4fbbd..7d75f92 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@@ -20,31 -20,29 +20,33 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; + import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; - import org.apache.ignite.internal.processors.cache.version.*; - import org.apache.ignite.internal.util.*; - import org.apache.ignite.lang.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.transactions.*; + import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.dr.*; - import org.apache.ignite.internal.util.typedef.*; - import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.transactions.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; import static org.apache.ignite.transactions.IgniteTxState.*; - import static org.apache.ignite.events.IgniteEventType.*; + import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + import static org.apache.ignite.transactions.IgniteTxState.*; /** * @@@ -142,18 -122,10 +144,18 @@@ public final class GridDhtTxPrepareFutu * @param last {@code True} if this is last prepare operation for node. * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. */ - public GridDhtTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridDhtTxLocalAdapter<K, V> tx, - IgniteUuid nearMiniId, Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap, boolean last, Collection<UUID> lastBackups) { + public GridDhtTxPrepareFuture( + GridCacheSharedContext<K, V> cctx, + final GridDhtTxLocalAdapter<K, V> tx, + IgniteUuid nearMiniId, + Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap, + boolean last, + boolean retVal, + Collection<UUID> lastBackups, + IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { - super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, IgniteTxEx<K, V>>() { - @Override public boolean collect(IgniteTxEx<K, V> e) { + super(cctx.kernalContext(), new IgniteReducer<IgniteInternalTx<K, V>, IgniteInternalTx<K, V>>() { + @Override public boolean collect(IgniteInternalTx<K, V> e) { return true; } @@@ -515,91 -383,53 +517,91 @@@ this.err.compareAndSet(null, err); - if (replied.compareAndSet(false, true)) { - try { - // Must clear prepare future before response is sent or listeners are notified. - if (tx.optimistic()) - tx.clearPrepareFuture(this); + // Must clear prepare future before response is sent or listeners are notified. + if (tx.optimistic()) + tx.clearPrepareFuture(this); - if (!tx.nearNodeId().equals(cctx.localNodeId())) { - // Send reply back to originating near node. - GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(tx.nearXidVersion(), - tx.nearFutureId(), nearMiniId, tx.xidVersion(), tx.invalidPartitions(), this.err.get()); + if (tx.onePhaseCommit()) { + assert last; - addDhtValues(res); + // Must create prepare response before transaction is committed to grab correct return value. + final GridNearTxPrepareResponse<K, V> res = createPrepareResponse(); - GridCacheVersion min = tx.minVersion(); + onComplete(); + + if (!tx.near()) { + if (tx.markFinalizing(IgniteTxEx.FinalizationStatus.USER_FINISH)) { + IgniteInternalFuture<IgniteTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync(); - res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min)); + fut.listenAsync(new CIX1<IgniteInternalFuture<IgniteTx>>() { + @Override public void applyx(IgniteInternalFuture<IgniteTx> gridCacheTxGridFuture) { + try { + if (replied.compareAndSet(false, true)) + sendPrepareResponse(res); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send prepare response for transaction: " + tx, e); + } + } + }); + } + } + else { + try { + if (replied.compareAndSet(false, true)) + sendPrepareResponse(res); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send prepare response for transaction: " + tx, e); + } + } - res.pending(localDhtPendingVersions(tx.writeEntries(), min)); + return true; + } + else { + if (replied.compareAndSet(false, true)) { + try { + sendPrepareResponse(createPrepareResponse()); - cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy()); + return true; } + catch (IgniteCheckedException e) { + onError(e); - return true; + return true; + } + finally { + // Will call super.onDone(). + onComplete(); + } } - catch (IgniteCheckedException e) { - onError(e); + else { + // Other thread is completing future. Wait for it to complete. + try { + get(); + } + catch (IgniteInterruptedException e) { + onError(new IgniteCheckedException("Got interrupted while waiting for replies to be sent.", e)); + } + catch (IgniteCheckedException ignored) { + // No-op, get() was just synchronization. + } - return true; - } - finally { - // Will call super.onDone(). - onComplete(); + return false; } } + } + + /** + * @throws IgniteCheckedException If failed to send response. + */ + private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws IgniteCheckedException { + if (!tx.nearNodeId().equals(cctx.localNodeId())) - cctx.io().send(tx.nearNodeId(), res); ++ cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy()); else { - // Other thread is completing future. Wait for it to complete. - try { - get(); - } - catch (IgniteInterruptedCheckedException e) { - onError(new IgniteCheckedException("Got interrupted while waiting for replies to be sent.", e)); - } - catch (IgniteCheckedException ignored) { - // No-op, get() was just synchronization. - } + assert completeCb != null; - return false; + completeCb.apply(res); } } @@@ -899,36 -742,13 +901,36 @@@ catch (GridCacheEntryRemovedException ignore) { assert false : "Got removed exception on entry with dht local candidate: " + entry; } + + idx++; + } + + if (!F.isEmpty(nearWrites)) { + for (IgniteTxEntry<K, V> entry : nearWrites) { + try { + GridCacheMvccCandidate<K> added = entry.cached().candidate(version()); + + assert added != null; + assert added.dhtLocal(); + + if (added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; + } + } } + assert req.transactionNodes() != null; + //noinspection TryWithIdenticalCatches try { - cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); ++ cctx.io().send(n, req, tx.ioPolicy()); } - catch (ClusterTopologyException e) { + catch (ClusterTopologyCheckedException e) { fut.onResult(e); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index 732703e,cd02ff6..0938525 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@@ -363,127 -369,85 +360,91 @@@ public class GridNearLockRequest<K, V> if (!super.writeTo(buf)) return false; - if (!commState.typeWritten) { - if (!commState.putByte(directType())) + if (!typeWritten) { + if (!writer.writeByte(null, directType())) return false; - commState.typeWritten = true; + typeWritten = true; } - switch (commState.idx) { - case 22: - if (!commState.putLong(accessTtl)) - return false; - - commState.idx++; - - case 23: - if (dhtVers != null) { - if (commState.it == null) { - if (!commState.putInt(dhtVers.length)) - return false; - - commState.it = arrayIterator(dhtVers); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putCacheVersion((GridCacheVersion)commState.cur)) - return false; - - commState.cur = NULL; - } - - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; - + switch (state) { case 24: - if (filterBytes != null) { - if (commState.it == null) { - if (!commState.putInt(filterBytes.length)) - return false; - - commState.it = arrayIterator(filterBytes); - } - - while (commState.it.hasNext() || commState.cur != NULL) { - if (commState.cur == NULL) - commState.cur = commState.it.next(); - - if (!commState.putByteArray((byte[])commState.cur)) - return false; - - commState.cur = NULL; - } + if (!writer.writeLong("accessTtl", accessTtl)) + return false; - commState.it = null; - } else { - if (!commState.putInt(-1)) - return false; - } - - commState.idx++; + state++; case 25: - if (!commState.putBoolean(implicitSingleTx)) + if (!writer.writeObjectArray("dhtVers", dhtVers, GridCacheVersion.class)) return false; - commState.idx++; + state++; case 26: - if (!commState.putBoolean(implicitTx)) + if (!writer.writeObjectArray("filterBytes", filterBytes, byte[].class)) return false; - commState.idx++; + state++; case 27: - if (!commState.putGridUuid(miniId)) + if (!writer.writeBoolean("hasTransforms", hasTransforms)) return false; - commState.idx++; + state++; case 28: - if (!commState.putBoolean(onePhaseCommit)) + if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx)) return false; - commState.idx++; + state++; + + case 29: + if (!writer.writeBoolean("implicitTx", implicitTx)) + return false; + + state++; + case 29: + if (!commState.putBoolean(syncCommit)) + return false; + + commState.idx++; + case 30: - if (!commState.putLong(topVer)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; - commState.idx++; + state++; case 31: - if (!commState.putUuid(subjId)) + if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) return false; - commState.idx++; + state++; case 32: - if (!commState.putInt(taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; - commState.idx++; + state++; case 33: - if (!commState.putBoolean(hasTransforms)) + if (!writer.writeBoolean("syncCommit", syncCommit)) return false; - commState.idx++; + state++; + + case 34: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + state++; + + case 35: + if (!writer.writeLong("topVer", topVer)) + return false; + + state++; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index e77689f,6fa2f6a..c5524f8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@@ -688,8 -695,8 +689,8 @@@ public class GridNearTxLocal<K, V> exte } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync() { + @Override public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync() { - IgniteInternalFuture<IgniteInternalTx<K, V>> fut = prepFut.get(); + GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, V>)prepFut.get(); if (fut == null) { // Future must be created before any exception can be thrown. @@@ -710,14 -718,16 +711,14 @@@ if (!state(PREPARING)) { if (setRollbackOnly()) { if (timedOut()) - fut.onError(new IgniteTxTimeoutException("Transaction timed out and was " + - pessimisticFut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was " + ++ fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was " + "rolled back: " + this)); else - pessimisticFut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + + fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + ", tx=" + this + ']')); } else - fut.onError(new IgniteTxRollbackException("Invalid transaction state for prepare " + - pessimisticFut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " + ++ fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " + "[state=" + state() + ", tx=" + this + ']')); return fut; @@@ -873,13 -891,11 +874,13 @@@ * @return Future that will be completed when locks are acquired. */ @SuppressWarnings("TypeMayBeWeakened") - public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsyncLocal( - public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsyncLocal(@Nullable Collection<IgniteTxEntry<K, V>> reads, - @Nullable Collection<IgniteTxEntry<K, V>> writes, Map<UUID, Collection<UUID>> txNodes, boolean last, - Collection<UUID> lastBackups) { - assert optimistic(); - ++ public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsyncLocal( + @Nullable Collection<IgniteTxEntry<K, V>> reads, + @Nullable Collection<IgniteTxEntry<K, V>> writes, + Map<UUID, Collection<UUID>> txNodes, boolean last, + Collection<UUID> lastBackups, + IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + ) { if (state() != PREPARING) { if (timedOut()) return new GridFinishedFuture<>(cctx.kernalContext(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 6496ef6,3c90db9..cd4ea34 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@@ -18,18 -18,16 +18,21 @@@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; - import org.apache.ignite.client.util.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; + import org.apache.ignite.internal.cluster.*; + import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; + import org.apache.ignite.internal.processors.cache.version.*; + import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; @@@ -311,69 -326,65 +315,69 @@@ public final class GridNearTxPrepareFut * Waits for topology exchange future to be ready and then prepares user transaction. */ public void prepare() { - GridDhtTopologyFuture topFut = topologyReadLock(); + if (tx.optimistic()) { + GridDhtTopologyFuture topFut = topologyReadLock(); - try { - if (topFut.isDone()) { - try { - if (!tx.state(PREPARING)) { - if (tx.setRollbackOnly()) { - if (tx.timedOut()) - onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + - "was rolled back: " + this)); + try { + if (topFut.isDone()) { + try { + if (!tx.state(PREPARING)) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) - onError(null, null, new IgniteTxTimeoutException("Transaction timed out and " + ++ onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + "was rolled back: " + this)); + else + onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } else - onError(null, null, new IgniteTxRollbackException("Invalid transaction state for " + - onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + - "[state=" + tx.state() + ", tx=" + this + ']')); ++ onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + "prepare [state=" + tx.state() + ", tx=" + this + ']')); + + return; } - else - onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + - "prepare [state=" + tx.state() + ", tx=" + this + ']')); - return; - } + GridDiscoveryTopologySnapshot snapshot = topFut.topologySnapshot(); - GridDiscoveryTopologySnapshot snapshot = topFut.topologySnapshot(); + tx.topologyVersion(snapshot.topologyVersion()); + tx.topologySnapshot(snapshot); - tx.topologyVersion(snapshot.topologyVersion()); - tx.topologySnapshot(snapshot); + // Make sure to add future before calling prepare. + cctx.mvcc().addFuture(this); - // Make sure to add future before calling prepare. - cctx.mvcc().addFuture(this); + prepare0(); + } + catch (IgniteTxTimeoutException | IgniteTxOptimisticException e) { + onError(cctx.localNodeId(), null, e); + } + catch (IgniteCheckedException e) { + tx.setRollbackOnly(); - prepare0(); - } - catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) { - onError(cctx.localNodeId(), null, e); - } - catch (IgniteCheckedException e) { - tx.setRollbackOnly(); + String msg = "Failed to prepare transaction (will attempt rollback): " + this; - String msg = "Failed to prepare transaction (will attempt rollback): " + this; + U.error(log, msg, e); - U.error(log, msg, e); + tx.rollbackAsync(); - onError(null, null, new IgniteTxRollbackException(msg, e)); - tx.rollbackAsync(); ++ onError(null, null, new IgniteTxRollbackCheckedException(msg, e)); + } + } + else { + topFut.syncNotify(false); - onError(null, null, new IgniteTxRollbackCheckedException(msg, e)); + topFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { + @Override public void apply(IgniteInternalFuture<Long> t) { + prepare(); + } + }); } } - else { - topFut.syncNotify(false); - - topFut.listenAsync(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> t) { - prepare(); - } - }); + finally { + topologyReadUnlock(); } } - finally { - topologyReadUnlock(); - } + else + preparePessimistic(); } /** @@@ -469,19 -491,6 +473,19 @@@ ConcurrentLinkedDeque8<GridDistributedTxMapping<K, V>> mappings = new ConcurrentLinkedDeque8<>(); + if (!F.isEmpty(reads) || !F.isEmpty(writes)) { + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { - onDone(new ClusterTopologyException("Failed to map keys for cache (all " + ++ onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name())); + + return; + } + } + } + // Assign keys to primary nodes. GridDistributedTxMapping<K, V> cur = null; @@@ -730,8 -646,14 +734,8 @@@ assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx + ", nodeId=" + n.id() + ']'; - MiniFuture fut = new MiniFuture(m, mappings); - - req.miniId(fut.futureId()); - - add(fut); // Append new future. - try { - cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + cctx.io().send(n, req, tx.ioPolicy()); } catch (IgniteCheckedException e) { // Fail the whole thing. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 497ed5f,9d5f73d..e3cb897 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@@ -20,12 -20,11 +20,13 @@@ package org.apache.ignite.internal.proc import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.lang.*; import org.apache.ignite.internal.processors.cache.transactions.*; - import org.apache.ignite.internal.util.direct.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; import java.io.*; @@@ -56,17 -55,9 +57,16 @@@ public class GridNearTxPrepareRequest<K /** IDs of backup nodes receiving last prepare request during this prepare. */ @GridDirectCollection(UUID.class) + @GridToStringInclude private Collection<UUID> lastBackups; + /** Need return value flag. */ + private boolean retVal; + + /** Implicit single flag. */ + private boolean implicitSingle; + /** Subject ID. */ - @GridDirectVersion(1) private UUID subjId; /** Task name hash. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c131bf04/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ----------------------------------------------------------------------