http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index b14d45d..df2e62b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -156,9 +156,6 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter /** */ private Set<Integer> invalidParts = new GridLeanSet<>(); - /** Recover writes. */ - private Collection<IgniteTxEntry<K, V>> recoveryWrites; - /** * Transaction state. Note that state is not protected, as we want to * always use {@link #state()} and {@link #state(IgniteTxState)} @@ -189,6 +186,9 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter /** Lock condition. */ private final Condition cond = lock.newCondition(); + /** */ + protected Map<UUID, Collection<UUID>> txNodes; + /** Subject ID initiated this transaction. */ protected UUID subjId; @@ -357,8 +357,6 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry<K, V>> optimisticLockEntries() { - assert optimistic(); - if (!groupLock()) return writeEntries(); else { @@ -385,20 +383,6 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter } } - /** - * @param recoveryWrites Recover write entries. - */ - public void recoveryWrites(Collection<IgniteTxEntry<K, V>> recoveryWrites) { - this.recoveryWrites = recoveryWrites; - } - - /** - * @return Recover write entries. - */ - @Override public Collection<IgniteTxEntry<K, V>> recoveryWrites() { - return recoveryWrites; - } - /** {@inheritDoc} */ @Override public boolean storeEnabled() { return storeEnabled; @@ -1163,7 +1147,14 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Nullable @Override public Map<UUID, Collection<UUID>> transactionNodes() { - return null; + return txNodes; + } + + /** + * @param txNodes Transaction nodes. + */ + public void transactionNodes(Map<UUID, Collection<UUID>> txNodes) { + this.txNodes = txNodes; } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 541e214..ca69a5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -131,9 +131,6 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, /** Group lock entry flag. */ private boolean grpLock; - /** Flag indicating if this entry should be transferred to remote node. */ - private boolean transferRequired; - /** Deployment enabled flag. */ private boolean depEnabled; @@ -280,20 +277,6 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, } /** - * @param transferRequired Sets flag indicating that transfer is required to remote node. - */ - public void transferRequired(boolean transferRequired) { - this.transferRequired = transferRequired; - } - - /** - * @return Flag indicating whether transfer is required to remote nodes. - */ - public boolean transferRequired() { - return transferRequired; - } - - /** * @param ctx Context. * @return Clean copy of this entry. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java index 321cd44..d6cec9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java @@ -303,13 +303,6 @@ public interface IgniteTxEx<K, V> extends IgniteTx, GridTimeoutObject { public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap(); /** - * Gets pessimistic recovery writes, i.e. values that have never been sent to remote nodes with lock requests. - * - * @return Collection of recovery writes. - */ - public Collection<IgniteTxEntry<K, V>> recoveryWrites(); - - /** * Gets a list of entries that needs to be locked on the next step of prepare stage of * optimistic transaction. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index a3c66c9..fa1e345 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -125,20 +125,6 @@ public class IgniteTxHandler<K, V> { processCheckPreparedTxResponse(nodeId, res); } }); - - ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxRequest.class, - new CI2<UUID, GridCachePessimisticCheckCommittedTxRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxRequest<K, V> req) { - processCheckCommittedTxRequest(nodeId, req); - } - }); - - ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxResponse.class, - new CI2<UUID, GridCachePessimisticCheckCommittedTxResponse<K, V>>() { - @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - processCheckCommittedTxResponse(nodeId, res); - } - }); } /** @@ -248,6 +234,10 @@ public class IgniteTxHandler<K, V> { if (tx == null) U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version() + ", mappedVer=" + mappedVer + ']'); + else { + if (req.concurrency() == PESSIMISTIC) + tx.nearFutureId(req.futureId()); + } } else { tx = new GridDhtTxLocal<>( @@ -257,8 +247,8 @@ public class IgniteTxHandler<K, V> { req.futureId(), req.miniId(), req.threadId(), - /*implicit*/false, - /*implicit-single*/false, + req.implicitSingle(), + req.implicitSingle(), req.system(), req.concurrency(), req.isolation(), @@ -279,10 +269,22 @@ public class IgniteTxHandler<K, V> { tx.topologyVersion(req.topologyVersion()); else U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" + - tx.xid() + ", req=" + req + ']'); + req.version() + ", req=" + req + ']'); } if (tx != null) { + tx.transactionNodes(req.transactionNodes()); + + if (req.onePhaseCommit()) { + assert req.last(); + assert F.isEmpty(req.lastBackups()) || req.lastBackups().size() <= 1; + + tx.onePhaseCommit(true); + } + + if (req.returnValue()) + tx.needReturnValue(true); + IgniteFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(), req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(), req.lastBackups()); @@ -327,8 +329,7 @@ public class IgniteTxHandler<K, V> { .<IgniteTxEx<K, V>>future(res.version(), res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']'); + U.warn(log, "Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']'); return; } @@ -428,12 +429,8 @@ public class IgniteTxHandler<K, V> { IgniteFuture<IgniteTx> nearFinishFut = null; - if (locTx == null || locTx.nearLocallyMapped()) { - if (locTx != null) - req.cloneEntries(); - + if (locTx == null || locTx.nearLocallyMapped()) nearFinishFut = finishDhtLocal(nodeId, locTx, req); - } if (colocatedFinishFut != null && nearFinishFut != null) { GridCompoundFuture<IgniteTx, IgniteTx> res = new GridCompoundFuture<>(ctx.kernalContext()); @@ -547,20 +544,6 @@ public class IgniteTxHandler<K, V> { tx.nearFinishFutureId(req.futureId()); tx.nearFinishMiniId(req.miniId()); - tx.recoveryWrites(req.recoveryWrites()); - - Collection<IgniteTxEntry<K, V>> writeEntries = req.writes(); - - if (!F.isEmpty(writeEntries)) { - // In OPTIMISTIC mode, we get the values at PREPARE stage. - assert tx.concurrency() == PESSIMISTIC; - - for (IgniteTxEntry<K, V> entry : writeEntries) - tx.addEntry(req.messageId(), entry); - } - - if (tx.pessimistic()) - tx.prepare(); IgniteFuture<IgniteTx> commitFut = tx.commitAsync(); @@ -640,6 +623,8 @@ public class IgniteTxHandler<K, V> { assert nodeId != null; assert req != null; + assert req.transactionNodes() != null; + if (log.isDebugEnabled()) log.debug("Processing dht tx prepare request [locNodeId=" + ctx.localNodeId() + ", nodeId=" + nodeId + ", req=" + req + ']'); @@ -662,13 +647,29 @@ public class IgniteTxHandler<K, V> { if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions())) res.invalidPartitions(dhtTx.invalidPartitions()); + + if (req.onePhaseCommit()) { + assert req.last(); + + if (dhtTx != null) { + dhtTx.onePhaseCommit(true); + + finish(nodeId, dhtTx, req); + } + + if (nearTx != null) { + nearTx.onePhaseCommit(true); + + finish(nodeId, nearTx, req); + } + } } catch (IgniteCheckedException e) { if (e instanceof IgniteTxRollbackException) - U.error(log, "Transaction was rolled back before prepare completed: " + dhtTx, e); + U.error(log, "Transaction was rolled back before prepare completed: " + req, e); else if (e instanceof IgniteTxOptimisticException) { if (log.isDebugEnabled()) - log.debug("Optimistic failure for remote transaction (will rollback): " + dhtTx); + log.debug("Optimistic failure for remote transaction (will rollback): " + req); } else U.error(log, "Failed to process prepare request: " + req, e); @@ -676,9 +677,6 @@ public class IgniteTxHandler<K, V> { if (nearTx != null) nearTx.rollback(); - if (dhtTx != null) - dhtTx.rollback(); - res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId(), e); } @@ -719,84 +717,36 @@ public class IgniteTxHandler<K, V> { GridDhtTxRemote<K, V> dhtTx = ctx.tm().tx(req.version()); GridNearTxRemote<K, V> nearTx = ctx.tm().nearTx(req.version()); - try { - if (dhtTx == null && !F.isEmpty(req.writes())) - dhtTx = startRemoteTxForFinish(nodeId, req); - - if (dhtTx != null) { - dhtTx.syncCommit(req.syncCommit()); - dhtTx.syncRollback(req.syncRollback()); - } - - // One-phase commit transactions send finish requests to backup nodes. - if (dhtTx != null && req.onePhaseCommit()) { - dhtTx.onePhaseCommit(true); - - dhtTx.writeVersion(req.writeVersion()); - } - - if (nearTx == null && !F.isEmpty(req.nearWrites()) && req.groupLock()) - nearTx = startNearRemoteTxForFinish(nodeId, req); - - if (nearTx != null) { - nearTx.syncCommit(req.syncCommit()); - nearTx.syncRollback(req.syncRollback()); - } - } - catch (IgniteTxRollbackException e) { - if (log.isDebugEnabled()) - log.debug("Received finish request for completed transaction (will ignore) [req=" + req + ", err=" + - e.getMessage() + ']'); - - sendReply(nodeId, req); - - return; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to start remote DHT and Near transactions (will invalidate transactions) [dhtTx=" + - dhtTx + ", nearTx=" + nearTx + ']', e); - - if (dhtTx != null) - dhtTx.invalidate(true); - - if (nearTx != null) - nearTx.invalidate(true); - } - catch (GridDistributedLockCancelledException ignore) { - U.warn(log, "Received commit request to cancelled lock (will invalidate transaction) [dhtTx=" + - dhtTx + ", nearTx=" + nearTx + ']'); - - if (dhtTx != null) - dhtTx.invalidate(true); - - if (nearTx != null) - nearTx.invalidate(true); - } - // Safety - local transaction will finish explicitly. if (nearTx != null && nearTx.local()) nearTx = null; - finish(nodeId, dhtTx, req, req.writes(), req.ttls()); + finish(nodeId, dhtTx, req, req.ttls()); if (nearTx != null) - finish(nodeId, nearTx, req, req.nearWrites(), req.nearTtls()); + finish(nodeId, nearTx, req, req.nearTtls()); - sendReply(nodeId, req); + if (dhtTx != null && !dhtTx.done()) { + dhtTx.finishFuture().listenAsync(new CI1<IgniteFuture<IgniteTx>>() { + @Override public void apply(IgniteFuture<IgniteTx> igniteTxIgniteFuture) { + sendReply(nodeId, req); + } + }); + } + else + sendReply(nodeId, req); } /** * @param nodeId Node ID. * @param tx Transaction. * @param req Request. - * @param writes Writes. * @param ttls TTLs for optimistic transaction. */ protected void finish( UUID nodeId, IgniteTxRemoteEx<K, V> tx, GridDhtTxFinishRequest<K, V> req, - Collection<IgniteTxEntry<K, V>> writes, @Nullable GridLongList ttls) { // We don't allow explicit locks for transactions and // therefore immediately return if transaction is null. @@ -827,21 +777,7 @@ public class IgniteTxHandler<K, V> { tx.invalidate(req.isInvalidate()); tx.systemInvalidate(req.isSystemInvalidate()); - if (!F.isEmpty(writes)) { - // In OPTIMISTIC mode, we get the values at PREPARE stage. - assert tx.concurrency() == PESSIMISTIC; - - for (IgniteTxEntry<K, V> entry : writes) { - if (log.isDebugEnabled()) - log.debug("Unmarshalled transaction entry from pessimistic transaction [key=" + - entry.key() + ", value=" + entry.value() + ", tx=" + tx + ']'); - - if (!tx.setWriteValue(entry)) - U.warn(log, "Received entry to commit that was not present in transaction [entry=" + - entry + ", tx=" + tx + ']'); - } - } - else if (tx.concurrency() == OPTIMISTIC && ttls != null) { + if (tx.concurrency() == OPTIMISTIC && ttls != null) { int idx = 0; for (IgniteTxEntry<K, V> e : tx.writeEntries()) @@ -851,15 +787,10 @@ public class IgniteTxHandler<K, V> { // Complete remote candidates. tx.doneRemote(req.baseVersion(), null, null, null); - if (tx.pessimistic()) - tx.prepare(); - tx.commit(); } } else { - assert tx != null; - tx.doneRemote(req.baseVersion(), null, null, null); tx.rollback(); @@ -882,6 +813,37 @@ public class IgniteTxHandler<K, V> { } /** + * @param nodeId Node ID. + * @param tx Transaction. + * @param req Request. + */ + protected void finish( + UUID nodeId, + GridDistributedTxRemoteAdapter<K, V> tx, + GridDhtTxPrepareRequest<K, V> req) { + assert tx != null : "No transaction for one-phase commit prepare request: " + req; + + try { + tx.commitVersion(req.writeVersion()); + tx.invalidate(req.isInvalidate()); + + // Complete remote candidates. + tx.doneRemote(req.version(), null, null, null); + + tx.commit(); + } + catch (Throwable e) { + U.error(log, "Failed committing transaction [tx=" + tx + ']', e); + + // Mark transaction for invalidate. + tx.invalidate(true); + tx.systemInvalidate(true); + + tx.rollback(); + } + } + + /** * Sends tx finish response to remote node, if response is requested. * * @param nodeId Node id that originated finish request. @@ -913,9 +875,11 @@ public class IgniteTxHandler<K, V> { * @return Remote transaction. * @throws IgniteCheckedException If failed. */ - @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId, + @Nullable GridDhtTxRemote<K, V> startRemoteTx( + UUID nodeId, GridDhtTxPrepareRequest<K, V> req, - GridDhtTxPrepareResponse<K, V> res) throws IgniteCheckedException { + GridDhtTxPrepareResponse<K, V> res + ) throws IgniteCheckedException { if (!F.isEmpty(req.writes())) { GridDhtTxRemote<K, V> tx = ctx.tm().tx(req.version()); @@ -930,7 +894,7 @@ public class IgniteTxHandler<K, V> { req.threadId(), req.topologyVersion(), req.version(), - req.commitVersion(), + null, req.system(), req.concurrency(), req.isolation(), @@ -943,6 +907,8 @@ public class IgniteTxHandler<K, V> { req.subjectId(), req.taskNameHash()); + tx.writeVersion(req.writeVersion()); + tx = ctx.tm().onCreated(tx); if (tx == null || !ctx.tm().onStarted(tx)) { @@ -1047,7 +1013,7 @@ public class IgniteTxHandler<K, V> { req.nearNodeId(), req.threadId(), req.version(), - req.commitVersion(), + null, req.system(), req.concurrency(), req.isolation(), @@ -1060,6 +1026,8 @@ public class IgniteTxHandler<K, V> { req.taskNameHash() ); + tx.writeVersion(req.writeVersion()); + if (!tx.empty()) { tx = ctx.tm().onCreated(tx); @@ -1076,6 +1044,9 @@ public class IgniteTxHandler<K, V> { // in prepare phase will get properly ordered as well. tx.prepare(); + if (req.last()) + tx.state(PREPARED); + return tx; } @@ -1083,312 +1054,6 @@ public class IgniteTxHandler<K, V> { } /** - * @param nodeId Primary node ID. - * @param req Request. - * @return Remote transaction. - * @throws IgniteCheckedException If failed. - * @throws GridDistributedLockCancelledException If lock has been cancelled. - */ - @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable GridDhtTxRemote<K, V> startRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req) - throws IgniteCheckedException, GridDistributedLockCancelledException { - - GridDhtTxRemote<K, V> tx = null; - - boolean marked = false; - - for (IgniteTxEntry<K, V> txEntry : req.writes()) { - GridDistributedCacheEntry<K, V> entry = null; - - GridCacheContext<K, V> cacheCtx = txEntry.context(); - - while (true) { - try { - int part = cacheCtx.affinity().partition(txEntry.key()); - - GridDhtLocalPartition<K, V> locPart = cacheCtx.topology().localPartition(part, - req.topologyVersion(), false); - - // Handle implicit locks for pessimistic transactions. - if (tx == null) - tx = ctx.tm().tx(req.version()); - - if (locPart == null || !locPart.reserve()) { - if (log.isDebugEnabled()) - log.debug("Local partition for given key is already evicted (will remove from tx) " + - "[key=" + txEntry.key() + ", part=" + part + ", locPart=" + locPart + ']'); - - if (tx != null) - tx.clearEntry(txEntry.txKey()); - - break; - } - - try { - entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key(), - req.topologyVersion()); - - if (tx == null) { - tx = new GridDhtTxRemote<>( - ctx, - req.nearNodeId(), - req.futureId(), - nodeId, - // We can pass null as nearXidVersion as transaction will be committed right away. - null, - req.threadId(), - req.topologyVersion(), - req.version(), - /*commitVer*/null, - req.system(), - PESSIMISTIC, - req.isolation(), - req.isInvalidate(), - 0, - req.txSize(), - req.groupLockKey(), - req.subjectId(), - req.taskNameHash()); - - tx = ctx.tm().onCreated(tx); - - if (tx == null || !ctx.tm().onStarted(tx)) - throw new IgniteTxRollbackException("Failed to acquire lock " + - "(transaction has been completed): " + req.version()); - } - - tx.addWrite(cacheCtx, - txEntry.op(), - txEntry.txKey(), - txEntry.keyBytes(), - txEntry.value(), - txEntry.valueBytes(), - txEntry.entryProcessors(), - txEntry.drVersion(), - txEntry.ttl()); - - if (!marked) { - if (tx.markFinalizing(USER_FINISH)) - marked = true; - else { - tx.clearEntry(txEntry.txKey()); - - return null; - } - } - - // Add remote candidate before reordering. - if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry()) - entry.addRemote( - req.nearNodeId(), - nodeId, - req.threadId(), - req.version(), - 0, - /*tx*/true, - tx.implicitSingle(), - null - ); - - // Double-check in case if sender node left the grid. - if (ctx.discovery().node(req.nearNodeId()) == null) { - if (log.isDebugEnabled()) - log.debug("Node requesting lock left grid (lock request will be ignored): " + req); - - tx.rollback(); - - return null; - } - - // Entry is legit. - break; - } - finally { - locPart.release(); - } - } - catch (GridCacheEntryRemovedException ignored) { - assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " + - entry; - - if (log.isDebugEnabled()) - log.debug("Received entry removed exception (will retry on renewed entry): " + entry); - - tx.clearEntry(txEntry.txKey()); - - if (log.isDebugEnabled()) - log.debug("Cleared removed entry from remote transaction (will retry) [entry=" + - entry + ", tx=" + tx + ']'); - } - catch (GridDhtInvalidPartitionException p) { - if (log.isDebugEnabled()) - log.debug("Received invalid partition (will clear entry from tx) [part=" + p + ", req=" + - req + ", txEntry=" + txEntry + ']'); - - if (tx != null) - tx.clearEntry(txEntry.txKey()); - - break; - } - } - } - - if (tx != null && tx.empty()) { - tx.rollback(); - - return null; - } - - return tx; - } - - /** - * @param nodeId Primary node ID. - * @param req Request. - * @return Remote transaction. - * @throws IgniteCheckedException If failed. - * @throws GridDistributedLockCancelledException If lock has been cancelled. - */ - @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable public GridNearTxRemote<K, V> startNearRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req) - throws IgniteCheckedException, GridDistributedLockCancelledException { - assert req.groupLock(); - - GridNearTxRemote<K, V> tx = null; - - ClassLoader ldr = ctx.deploy().globalLoader(); - - if (ldr != null) { - boolean marked = false; - - for (IgniteTxEntry<K, V> txEntry : req.nearWrites()) { - GridDistributedCacheEntry<K, V> entry = null; - - GridCacheContext<K, V> cacheCtx = txEntry.context(); - - while (true) { - try { - entry = cacheCtx.near().peekExx(txEntry.key()); - - if (entry != null) { - entry.keyBytes(txEntry.keyBytes()); - - // Handle implicit locks for pessimistic transactions. - if (tx == null) - tx = ctx.tm().nearTx(req.version()); - - if (tx == null) { - tx = new GridNearTxRemote<>( - ctx, - nodeId, - req.nearNodeId(), - // We can pass null as nearXidVer as transaction will be committed right away. - null, - req.threadId(), - req.version(), - null, - req.system(), - PESSIMISTIC, - req.isolation(), - req.isInvalidate(), - 0, - req.txSize(), - req.groupLockKey(), - req.subjectId(), - req.taskNameHash()); - - tx = ctx.tm().onCreated(tx); - - if (tx == null || !ctx.tm().onStarted(tx)) - throw new IgniteTxRollbackException("Failed to acquire lock " + - "(transaction has been completed): " + req.version()); - - if (!marked) - marked = tx.markFinalizing(USER_FINISH); - - if (!marked) - return null; - } - - if (tx.local()) - return null; - - if (!marked) - marked = tx.markFinalizing(USER_FINISH); - - if (marked) - tx.addEntry(cacheCtx, txEntry.txKey(), txEntry.keyBytes(), txEntry.op(), txEntry.value(), - txEntry.valueBytes(), txEntry.drVersion()); - else - return null; - - if (req.groupLock()) { - tx.markGroupLock(); - - if (!txEntry.groupLockEntry()) - tx.groupLockKey(txEntry.txKey()); - } - - // Add remote candidate before reordering. - if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry()) - entry.addRemote( - req.nearNodeId(), - nodeId, - req.threadId(), - req.version(), - 0, - /*tx*/true, - tx.implicitSingle(), - null - ); - } - - // Double-check in case if sender node left the grid. - if (ctx.discovery().node(req.nearNodeId()) == null) { - if (log.isDebugEnabled()) - log.debug("Node requesting lock left grid (lock request will be ignored): " + req); - - if (tx != null) - tx.rollback(); - - return null; - } - - // Entry is legit. - break; - } - catch (GridCacheEntryRemovedException ignored) { - assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " + - entry; - - if (log.isDebugEnabled()) - log.debug("Received entry removed exception (will retry on renewed entry): " + entry); - - if (tx != null) { - tx.clearEntry(txEntry.txKey()); - - if (log.isDebugEnabled()) - log.debug("Cleared removed entry from remote transaction (will retry) [entry=" + - entry + ", tx=" + tx + ']'); - } - - // Will retry in while loop. - } - } - } - } - else { - String err = "Failed to acquire deployment class loader for message: " + req; - - U.warn(log, err); - - throw new IgniteCheckedException(err); - } - - return tx; - } - - /** * @param nodeId Node ID. * @param req Request. */ @@ -1437,83 +1102,4 @@ public class IgniteTxHandler<K, V> { fut.onResult(nodeId, res); } - - /** - * @param nodeId Node ID. - * @param req Request. - */ - protected void processCheckCommittedTxRequest(final UUID nodeId, - final GridCachePessimisticCheckCommittedTxRequest<K, V> req) { - if (log.isDebugEnabled()) - log.debug("Processing check committed transaction request [nodeId=" + nodeId + ", req=" + req + ']'); - - IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut = ctx.tm().checkPessimisticTxCommitted(req); - - infoFut.listenAsync(new CI1<IgniteFuture<GridCacheCommittedTxInfo<K, V>>>() { - @Override public void apply(IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut) { - GridCacheCommittedTxInfo<K, V> info = null; - - try { - info = infoFut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to obtain committed info for transaction (will rollback): " + req, e); - } - - GridCachePessimisticCheckCommittedTxResponse<K, V> - res = new GridCachePessimisticCheckCommittedTxResponse<>( - req.version(), req.futureId(), req.miniId(), info); - - if (log.isDebugEnabled()) - log.debug("Finished waiting for tx committed info [req=" + req + ", res=" + res + ']'); - - sendCheckCommittedResponse(nodeId, res); } - }); - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - protected void processCheckCommittedTxResponse(UUID nodeId, - GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - if (log.isDebugEnabled()) - log.debug("Processing check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - - GridCachePessimisticCheckCommittedTxFuture<K, V> fut = - (GridCachePessimisticCheckCommittedTxFuture<K, V>)ctx.mvcc().<GridCacheCommittedTxInfo<K, V>>future( - res.version(), res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); - - return; - } - - fut.onResult(nodeId, res); - } - - /** - * Sends check committed response to remote node. - * - * @param nodeId Node ID to send to. - * @param res Reponse to send. - */ - private void sendCheckCommittedResponse(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - try { - if (log.isDebugEnabled()) - log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - - ctx.io().send(nodeId, res); - } - catch (ClusterTopologyException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send check committed transaction response (did node leave grid?) [nodeId=" + - nodeId + ", res=" + res + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index b0934f1..d1a2be5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; @@ -93,6 +94,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> /** Active cache IDs. */ protected Set<Integer> activeCacheIds = new HashSet<>(); + /** Need return value. */ + protected boolean needRetVal; + + /** Implicit transaction result. */ + protected GridCacheReturn<V> implicitRes = new GridCacheReturn<>(false); + /** * Empty constructor required for {@link Externalizable}. */ @@ -264,6 +271,32 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> writeView.seal(); } + /** {@inheritDoc} */ + @Override public GridCacheReturn<V> implicitSingleResult() { + return implicitRes; + } + + /** + * @param ret Result. + */ + public void implicitSingleResult(GridCacheReturn<V> ret) { + implicitRes = ret; + } + + /** + * @return Flag indicating whether transaction needs return value. + */ + public boolean needReturnValue() { + return needRetVal; + } + + /** + * @param needRetVal Need return value flag. + */ + public void needReturnValue(boolean needRetVal) { + this.needRetVal = needRetVal; + } + /** * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent * to remote nodes. @@ -650,9 +683,6 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> addGroupTxMapping(writeSet()); if (!empty) { - // We are holding transaction-level locks for entries here, so we can get next write version. - writeVersion(cctx.versions().next(topologyVersion())); - batchStoreCommit(writeMap().values()); try { @@ -778,7 +808,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> evt, metrics, topVer, - txEntry.filters(), + null, cached.detached() ? DR_NONE : drType, txEntry.drExpireTime(), cached.isNear() ? null : explicitVer, @@ -815,7 +845,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> evt, metrics, topVer, - txEntry.filters(), + null, cached.detached() ? DR_NONE : drType, cached.isNear() ? null : explicitVer, CU.subjectId(this, cctx), @@ -1856,7 +1886,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> */ private boolean filter(GridCacheEntryEx<K, V> cached, IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException { - return pessimistic() || cached.context().isAll(cached, filter); + return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter); } /** @@ -1994,7 +2024,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); - if (optimistic()) { + if (optimistic() && !implicit()) { try { // Should read through if filter is specified. old = entry.innerGet(this, @@ -2011,6 +2041,11 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> CU.<K, V>empty(), null); } + catch (ClusterTopologyException e) { + entry.context().evicts().touch(entry, topologyVersion()); + + throw e; + } catch (GridCacheFilterFailedException e) { e.printStackTrace(); @@ -2072,7 +2107,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> enlisted.add(key); - if (!pessimistic() || (groupLock() && !lockOnly)) { + if ((!pessimistic() && !implicit()) || (groupLock() && !lockOnly)) { txEntry.markValid(); if (old == null) { @@ -2463,11 +2498,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap, final boolean retval, @Nullable GridCacheEntryEx<K, V> cached, - @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter) { + @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter + ) { assert filter == null || invokeMap == null; cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT); + if (retval) + needReturnValue(true); + // Cached entry may be passed only from entry wrapper. final Map<K, V> map0; final Map<K, EntryProcessor<K, V, Object>> invokeMap0; @@ -2651,13 +2690,34 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> cctx.kernalContext()); } else { - return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() { - @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException { - f.get(); + if (implicit()) { + // Should never load missing values for implicit transaction as values will be returned + // with prepare response, if required. + assert loadFut.isDone(); - return ret; + try { + loadFut.get(); } - }); + catch (IgniteCheckedException e) { + return new GridFinishedFutureEx<>(new GridCacheReturn<V>(), e); + } + + return commitAsync().chain(new CX1<IgniteFuture<IgniteTx>, GridCacheReturn<V>>() { + @Override public GridCacheReturn<V> applyx(IgniteFuture<IgniteTx> txFut) throws IgniteCheckedException { + txFut.get(); + + return implicitRes; + } + }); + } + else + return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() { + @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException { + f.get(); + + return ret; + } + }); } } catch (IgniteCheckedException e) { @@ -2696,6 +2756,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter) { cacheCtx.checkSecurity(GridSecurityPermission.CACHE_REMOVE); + if (retval) + needReturnValue(true); + final Collection<? extends K> keys0; if (drMap != null) { @@ -2851,13 +2914,27 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> cctx.kernalContext()); } else { - return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() { - @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException { - f.get(); + if (implicit()) { + // Should never load missing values for implicit transaction as values will be returned + // with prepare response, if required. + assert loadFut.isDone(); - return ret; - } - }); + return commitAsync().chain(new CX1<IgniteFuture<IgniteTx>, GridCacheReturn<V>>() { + @Override public GridCacheReturn<V> applyx(IgniteFuture<IgniteTx> txFut) throws IgniteCheckedException { + txFut.get(); + + return implicitRes; + } + }); + } + else + return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() { + @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException { + f.get(); + + return ret; + } + }); } } catch (IgniteCheckedException e) { @@ -3168,7 +3245,6 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } txEntry.filtersSet(filtersSet); - txEntry.transferRequired(true); while (true) { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 22bf372..267e43d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -153,6 +153,11 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { public boolean partitionLock(); /** + * @return Return value for + */ + public GridCacheReturn<V> implicitSingleResult(); + + /** * Finishes transaction (either commit or rollback). * * @param commit {@code True} if commit, {@code false} if rollback. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index fae1989..df9f409 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -87,15 +87,8 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { new ConcurrentSkipListMap<>(); /** Committed local transactions. */ - private final GridBoundedConcurrentOrderedSet<GridCacheVersion> committedVers = - new GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); - - /** Rolled back local transactions. */ - private final NavigableSet<GridCacheVersion> rolledbackVers = - new GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); - - /** Pessimistic commit buffer. */ - private GridCacheTxCommitBuffer<K, V> pessimisticRecoveryBuf; + private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVers = + new GridBoundedConcurrentOrderedMap<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); /** Transaction synchronizations. */ private final Collection<IgniteTxSynchronization> syncs = @@ -147,8 +140,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { - pessimisticRecoveryBuf = new GridCachePerThreadTxCommitBuffer<>(cctx); - txFinishSync = new GridCacheTxFinishSync<>(cctx); txHandler = new IgniteTxHandler<>(cctx); @@ -290,11 +281,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { X.println(">>> prepareQueueSize: " + prepareQ.size()); X.println(">>> startVerCntsSize [size=" + startVerCnts.size() + ", firstVer=" + startVerEntry + ']'); - X.println(">>> committedVersSize: " + committedVers.size()); - X.println(">>> rolledbackVersSize: " + rolledbackVers.size()); - - if (pessimisticRecoveryBuf != null) - X.println(">>> pessimsticCommitBufSize: " + pessimisticRecoveryBuf.size()); + X.println(">>> completedVersSize: " + completedVers.size()); } /** @@ -335,15 +322,8 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { /** * @return Committed versions size. */ - public int committedVersionsSize() { - return committedVers.size(); - } - - /** - * @return Rolled back versions size. - */ - public int rolledbackVersionsSize() { - return rolledbackVers.size(); + public int completedVersionsSize() { + return completedVers.size(); } /** @@ -353,7 +333,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * {@code false} otherwise. */ public boolean isCompleted(IgniteTxEx<K, V> tx) { - return committedVers.contains(tx.xidVersion()) || rolledbackVers.contains(tx.xidVersion()); + return completedVers.containsKey(tx.xidVersion()); } /** @@ -753,7 +733,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { boolean txSerializableEnabled = cctx.txConfig().isTxSerializableEnabled(); // Clean up committed transactions queue. - if (tx.pessimistic()) { + if (tx.pessimistic() && tx.local()) { if (tx.enforceSerializable() && txSerializableEnabled) { for (Iterator<IgniteTxEx<K, V>> it = committedQ.iterator(); it.hasNext();) { IgniteTxEx<K, V> committedTx = it.next(); @@ -863,7 +843,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } // Optimistic. - assert tx.optimistic(); + assert tx.optimistic() || !tx.local(); if (!lockMultiple(tx, tx.optimisticLockEntries())) { tx.setRollbackOnly(); @@ -943,14 +923,17 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } /** - * @param c Collection to copy. + * @param map Collection to copy. + * @param expVal Values to copy. * @return Copy of the collection. */ - private Collection<GridCacheVersion> copyOf(Iterable<GridCacheVersion> c) { + private Collection<GridCacheVersion> copyOf(Map<GridCacheVersion, Boolean> map, boolean expVal) { Collection<GridCacheVersion> l = new LinkedList<>(); - for (GridCacheVersion v : c) - l.add(v); + for (Map.Entry<GridCacheVersion, Boolean> e : map.entrySet()) { + if (e.getValue() == expVal) + l.add(e.getKey()); + } return l; } @@ -962,9 +945,10 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * @return Committed transactions starting from the given version (non-inclusive). */ public Collection<GridCacheVersion> committedVersions(GridCacheVersion min) { - Set<GridCacheVersion> set = committedVers.tailSet(min, true); + ConcurrentNavigableMap<GridCacheVersion, Boolean> tail + = completedVers.tailMap(min, true); - return set == null || set.isEmpty() ? Collections.<GridCacheVersion>emptyList() : copyOf(set); + return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, true); } /** @@ -974,16 +958,17 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * @return Committed transactions starting from the given version (non-inclusive). */ public Collection<GridCacheVersion> rolledbackVersions(GridCacheVersion min) { - Set<GridCacheVersion> set = rolledbackVers.tailSet(min, true); + ConcurrentNavigableMap<GridCacheVersion, Boolean> tail + = completedVers.tailMap(min, true); - return set == null || set.isEmpty() ? Collections.<GridCacheVersion>emptyList() : copyOf(set); + return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, false); } /** * @param tx Tx to remove. */ public void removeCommittedTx(IgniteTxEx<K, V> tx) { - committedVers.remove(tx.xidVersion()); + completedVers.remove(tx.xidVersion(), true); } /** @@ -1008,12 +993,12 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * @return If transaction was not already present in completed set. */ public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) { - assert !rolledbackVers.contains(xidVer) : "Version was rolled back: " + xidVer; - if (nearXidVer != null) xidVer = new CommittedVersion(xidVer, nearXidVer); - if (committedVers.add(xidVer)) { + Boolean committed = completedVers.putIfAbsent(xidVer, true); + + if (committed == null || committed) { if (log.isDebugEnabled()) log.debug("Added transaction to committed version set: " + xidVer); @@ -1021,7 +1006,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } else { if (log.isDebugEnabled()) - log.debug("Transaction is already present in committed version set: " + xidVer); + log.debug("Transaction is already present in rolled back version set: " + xidVer); return false; } @@ -1032,9 +1017,9 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * @return If transaction was not already present in completed set. */ public boolean addRolledbackTx(GridCacheVersion xidVer) { - assert !committedVers.contains(xidVer); + Boolean committed = completedVers.putIfAbsent(xidVer, false); - if (rolledbackVers.add(xidVer)) { + if (committed == null || !committed) { if (log.isDebugEnabled()) log.debug("Added transaction to rolled back version set: " + xidVer); @@ -1042,7 +1027,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } else { if (log.isDebugEnabled()) - log.debug("Transaction is already present in rolled back version set: " + xidVer); + log.debug("Transaction is already present in committed version set: " + xidVer); return false; } @@ -1172,13 +1157,15 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * so we don't do it here. */ + Boolean committed = completedVers.get(tx.xidVersion()); + // 1. Make sure that committed version has been recorded. - if (!(committedVers.contains(tx.xidVersion()) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { + if (!(committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate()) { uncommitTx(tx); throw new IgniteException("Missing commit version (consider increasing " + GG_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" + - committedVers.firstx() + ", lastVer=" + committedVers.lastx() + ", tx=" + tx.xid() + ']'); + completedVers.firstKey() + ", lastVer=" + completedVers.lastKey() + ", tx=" + tx.xid() + ']'); } ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap = transactionMap(tx); @@ -1197,9 +1184,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) cacheCtx.dataStructures().onTxCommitted(tx); - // 3.2 Add to pessimistic commit buffer if needed. - addPessimisticRecovery(tx); - // 4. Unlock write resources. if (tx.groupLock()) unlockGroupLocks(tx); @@ -1513,7 +1497,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { */ private boolean lockMultiple(IgniteTxEx<K, V> tx, Iterable<IgniteTxEntry<K, V>> entries) throws IgniteCheckedException { - assert tx.optimistic(); + assert tx.optimistic() || !tx.local(); long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout()); @@ -1524,7 +1508,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { for (IgniteTxEntry<K, V> txEntry1 : entries) { // Check if this entry was prepared before. - if (!txEntry1.markPrepared()) + if (!txEntry1.markPrepared() || txEntry1.explicitVersion() != null) continue; GridCacheContext<K, V> cacheCtx = txEntry1.context(); @@ -1638,11 +1622,11 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { try { GridCacheEntryEx<K, V> entry = txEntry.cached(); + assert entry != null; + if (entry.detached()) break; - assert entry != null; - entry.txUnlock(tx); break; @@ -1803,7 +1787,12 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { // Not all transactions were found. Need to scan committed versions to check // if transaction was already committed. - for (GridCacheVersion ver : committedVers) { + for (Map.Entry<GridCacheVersion, Boolean> e : completedVers.entrySet()) { + if (!e.getValue()) + continue; + + GridCacheVersion ver = e.getKey(); + if (processedVers != null && processedVers.contains(ver)) continue; @@ -1821,37 +1810,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } /** - * Adds transaction to pessimistic recovery buffer if needed. - * - * @param tx Committed transaction to add. - */ - private void addPessimisticRecovery(IgniteTxEx<K, V> tx) { - if (pessimisticRecoveryBuf == null) - return; - - // Do not store recovery information for optimistic or replicated local transactions. - if (tx.optimistic() || (tx.local() && tx.replicated())) - return; - - pessimisticRecoveryBuf.addCommittedTx(tx); - } - - /** - * Checks whether transaction with given near version was committed on this node and returns commit info. - * - * @param nearTxVer Near tx version. - * @param originatingNodeId Originating node ID. - * @param originatingThreadId Originating thread ID. - * @return Commit info, if present. - */ - @Nullable public GridCacheCommittedTxInfo<K, V> txCommitted(GridCacheVersion nearTxVer, - UUID originatingNodeId, long originatingThreadId) { - assert pessimisticRecoveryBuf != null : "Should not be called for LOCAL cache."; - - return pessimisticRecoveryBuf.committedTx(nearTxVer, originatingNodeId, originatingThreadId); - } - - /** * Gets local transaction for pessimistic tx recovery. * * @param nearXidVer Near tx ID. @@ -1952,8 +1910,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { continue; } - ((IgniteTxAdapter<K, V>)tx).recoveryWrites(commitInfo.recoveryWrites()); - // If write was not found, check read. IgniteTxEntry<K, V> read = tx.readMap().remove(entry.txKey()); @@ -1974,45 +1930,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } /** - * @param req Check committed request. - * @return Check committed future. - */ - public IgniteFuture<GridCacheCommittedTxInfo<K, V>> checkPessimisticTxCommitted(GridCachePessimisticCheckCommittedTxRequest req) { - // First check if we have near transaction with this ID. - IgniteTxEx<K, V> tx = localTxForRecovery(req.nearXidVersion(), !req.nearOnlyCheck()); - - // Either we found near transaction or one of transactions is being committed by user. - // Wait for it and send reply. - if (tx != null) { - assert tx.local(); - - if (log.isDebugEnabled()) - log.debug("Found active near transaction, will wait for completion [req=" + req + ", tx=" + tx + ']'); - - final IgniteTxEx<K, V> tx0 = tx; - - return tx.finishFuture().chain(new C1<IgniteFuture<IgniteTx>, GridCacheCommittedTxInfo<K, V>>() { - @Override public GridCacheCommittedTxInfo<K, V> apply(IgniteFuture<IgniteTx> txFut) { - GridCacheCommittedTxInfo<K, V> info = null; - - if (tx0.state() == COMMITTED) - info = new GridCacheCommittedTxInfo<>(tx0); - - return info; - } - }); - } - - GridCacheCommittedTxInfo<K, V> info = txCommitted(req.nearXidVersion(), req.originatingNodeId(), - req.originatingThreadId()); - - if (info == null) - info = txCommitted(req.nearXidVersion(), req.originatingNodeId(), req.originatingThreadId()); - - return new GridFinishedFutureEx<>(info); - } - - /** * Timeout object for node failure handler. */ private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter { @@ -2050,7 +1967,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { // Invalidate transactions. salvageTx(tx, false, RECOVERY_FINISH); } - else if (tx.optimistic()) { + else { // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx. if (tx.originatingNodeId().equals(evtNodeId)) { if (tx.state() == PREPARED) @@ -2062,23 +1979,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } } } - else { - // Pessimistic. - if (tx.originatingNodeId().equals(evtNodeId)) { - if (tx.state() != COMMITTING && tx.state() != COMMITTED) - commitIfRemotelyCommitted(tx); - else { - if (log.isDebugEnabled()) - log.debug("Skipping pessimistic transaction check (transaction is being committed) " + - "[tx=" + tx + ", locNodeId=" + cctx.localNodeId() + ']'); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Skipping pessimistic transaction check [tx=" + tx + - ", evtNodeId=" + evtNodeId + ", locNodeId=" + cctx.localNodeId() + ']'); - } - } } } finally { @@ -2097,7 +1997,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { assert !F.isEmpty(tx.transactionNodes()); assert tx.nearXidVersion() != null; - GridCacheOptimisticCheckPreparedTxFuture<K, V> fut = new GridCacheOptimisticCheckPreparedTxFuture<>( cctx, tx, evtNodeId, tx.transactionNodes()); @@ -2108,25 +2007,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { fut.prepare(); } - - /** - * Commits pessimistic transaction if at least one of remote nodes has committed this transaction. - * - * @param tx Transaction. - */ - private void commitIfRemotelyCommitted(IgniteTxEx<K, V> tx) { - assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx; - - GridCachePessimisticCheckCommittedTxFuture<K, V> fut = new GridCachePessimisticCheckCommittedTxFuture<>( - cctx, tx, evtNodeId); - - cctx.mvcc().addFuture(fut); - - if (log.isDebugEnabled()) - log.debug("Checking pessimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']'); - - fut.prepare(); - } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java index 3ac8227..3a46734 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java @@ -71,6 +71,13 @@ public class IgniteTxProxyImpl<K, V> implements IgniteTxProxy, Externalizable { } /** + * @return Transaction. + */ + public IgniteTxEx<K, V> tx() { + return tx; + } + + /** * Enters a call. */ private void enter() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java index 265e2e2..549a18e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java @@ -189,10 +189,10 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap if (lsnr != null) lsnr.apply(key, val); } - catch (NoSuchElementException e1) { - e1.printStackTrace(); // Should never happen. + catch (NoSuchElementException ignored) { + cnt.incrementAndGet(); - assert false : "Internal error in grid bounded ordered set."; + return; } } } @@ -226,7 +226,12 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap * @return {@inheritDoc} */ @Override public V remove(Object o) { - throw new UnsupportedOperationException("Remove is not supported on concurrent bounded map."); + V old = super.remove(o); + + if (old != null) + cnt.decrementAndGet(); + + return old; } /** @@ -237,6 +242,11 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap * @return {@inheritDoc} */ @Override public boolean remove(Object key, Object val) { - throw new UnsupportedOperationException("Remove is not supported on concurrent bounded map."); + boolean rmvd = super.remove(key, val); + + if (rmvd) + cnt.decrementAndGet(); + + return rmvd; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 7f17746..d92ad4d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -461,11 +461,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract public void testGetAll() throws Exception { IgniteTx tx = txEnabled() ? cache().txStart() : null; - cache().put("key1", 1); - cache().put("key2", 2); + try { + cache().put("key1", 1); + cache().put("key2", 2); - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } assert cache().getAll(null).isEmpty(); assert cache().getAll(Collections.<String>emptyList()).isEmpty(); @@ -494,30 +500,36 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract if (txEnabled()) { tx = cache().txStart(); - assert cache().getAll(null).isEmpty(); - assert cache().getAll(Collections.<String>emptyList()).isEmpty(); + try { + assert cache().getAll(null).isEmpty(); + assert cache().getAll(Collections.<String>emptyList()).isEmpty(); - map1 = cache().getAll(F.asList("key1", "key2", "key9999")); + map1 = cache().getAll(F.asList("key1", "key2", "key9999")); - info("Retrieved map1: " + map1); + info("Retrieved map1: " + map1); - assert 2 == map1.size() : "Invalid map: " + map1; + assert 2 == map1.size() : "Invalid map: " + map1; - assertEquals(1, (int)map1.get("key1")); - assertEquals(2, (int)map1.get("key2")); - assertNull(map1.get("key9999")); + assertEquals(1, (int)map1.get("key1")); + assertEquals(2, (int)map1.get("key2")); + assertNull(map1.get("key9999")); - map2 = cache().getAll(F.asList("key1", "key2", "key9999")); + map2 = cache().getAll(F.asList("key1", "key2", "key9999")); - info("Retrieved map2: " + map2); + info("Retrieved map2: " + map2); - assert 2 == map2.size() : "Invalid map: " + map2; + assert 2 == map2.size() : "Invalid map: " + map2; - assertEquals(1, (int)map2.get("key1")); - assertEquals(2, (int)map2.get("key2")); - assertNull(map2.get("key9999")); + assertEquals(1, (int)map2.get("key1")); + assertEquals(2, (int)map2.get("key2")); + assertNull(map2.get("key9999")); - tx.commit(); + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } } } @@ -571,14 +583,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract if (txEnabled()) { IgniteTx tx = cache().txStart(); - cache().put("key1", 100); - cache().put("key2", 101); - cache().put("key3", 200); - cache().put("key4", 201); - - tx.commit(); + try { + cache().put("key1", 100); + cache().put("key2", 101); + cache().put("key3", 200); + cache().put("key4", 201); - tx.close(); + tx.commit(); + } + finally { + tx.close(); + } tx = cache().txStart(PESSIMISTIC, REPEATABLE_READ); @@ -731,27 +746,27 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract */ public void testPutTx() throws Exception { if (txEnabled()) { - IgniteTx tx = cache().txStart(); - - assert cache().put("key1", 1) == null; - assert cache().put("key2", 2) == null; + try (IgniteTx tx = cache().txStart()) { + assert cache().put("key1", 1) == null; + assert cache().put("key2", 2) == null; - // Check inside transaction. - assert cache().get("key1") == 1; - assert cache().get("key2") == 2; + // Check inside transaction. + assert cache().get("key1") == 1; + assert cache().get("key2") == 2; - // Put again to check returned values. - assert cache().put("key1", 1) == 1; - assert cache().put("key2", 2) == 2; + // Put again to check returned values. + assert cache().put("key1", 1) == 1; + assert cache().put("key2", 2) == 2; - checkContainsKey(true, "key1"); - checkContainsKey(true, "key2"); + checkContainsKey(true, "key1"); + checkContainsKey(true, "key2"); - assert cache().get("key1") != null; - assert cache().get("key2") != null; - assert cache().get("wrong") == null; + assert cache().get("key1") != null; + assert cache().get("key2") != null; + assert cache().get("wrong") == null; - tx.commit(); + tx.commit(); + } // Check outside transaction. checkContainsKey(true, "key1"); @@ -1192,11 +1207,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract public void testPutFiltered() throws Exception { IgniteTx tx = txEnabled() ? cache().txStart() : null; - cache().put("key1", 1, F.<String, Integer>cacheNoPeekValue()); - cache().put("key2", 100, gte100); + try { + cache().put("key1", 1, F.<String, Integer>cacheNoPeekValue()); + cache().put("key2", 100, gte100); - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } checkSize(F.asSet("key1")); @@ -1207,7 +1228,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assert i == null : "Why not null?: " + i; } - /** * @throws Exception In case of error. */ @@ -1330,11 +1350,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract public void testPutAsyncFiltered() throws Exception { IgniteTx tx = txEnabled() ? cache().txStart() : null; - assert cache().putAsync("key1", 1, gte100).get() == null; - assert cache().putAsync("key2", 101, F.<String, Integer>cacheNoPeekValue()).get() == null; + try { + assert cache().putAsync("key1", 1, gte100).get() == null; + assert cache().putAsync("key2", 101, F.<String, Integer>cacheNoPeekValue()).get() == null; - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } checkSize(F.asSet("key2")); @@ -1377,16 +1403,22 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract private void checkPutx(boolean inTx) throws Exception { IgniteTx tx = inTx ? cache().txStart() : null; - assert cache().putx("key1", 1); - assert cache().putx("key2", 2); - assert !cache().putx("wrong", 3, gte100); + try { + assert cache().putx("key1", 1); + assert cache().putx("key2", 2); + assert !cache().putx("wrong", 3, gte100); - // Check inside transaction. - assert cache().get("key1") == 1; - assert cache().get("key2") == 2; + // Check inside transaction. + assert cache().get("key1") == 1; + assert cache().get("key2") == 2; - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); + } + finally { + if (tx != null) + tx.close(); + } checkSize(F.asSet("key1", "key2")); @@ -1611,25 +1643,31 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract public void testPutxAsync() throws Exception { IgniteTx tx = txEnabled() ? cache().txStart() : null; - cache().put("key2", 1); + try { + cache().put("key2", 1); - IgniteFuture<Boolean> fut1 = cache().putxAsync("key1", 10); - IgniteFuture<Boolean> fut2 = cache().putxAsync("key2", 11); + IgniteFuture<Boolean> fut1 = cache().putxAsync("key1", 10); + IgniteFuture<Boolean> fut2 = cache().putxAsync("key2", 11); - IgniteFuture<IgniteTx> f = null; + IgniteFuture<IgniteTx> f = null; - if (tx != null) { - tx = (IgniteTx)tx.enableAsync(); + if (tx != null) { + tx = (IgniteTx)tx.enableAsync(); - tx.commit(); + tx.commit(); - f = tx.future(); - } + f = tx.future(); + } - assert fut1.get(); - assert fut2.get(); + assert fut1.get(); + assert fut2.get(); - assert f == null || f.get().state() == COMMITTED; + assert f == null || f.get().state() == COMMITTED; + } + finally { + if (tx != null) + tx.close(); + } checkSize(F.asSet("key1", "key2")); @@ -2017,6 +2055,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertFalse(cache().putxIfAbsentAsync("key3", 4).get()); + assertEquals((Integer)3, cache().get("key3")); + cache().evict("key2"); cache().clear("key3"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java index 466e178..1111437 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java @@ -55,7 +55,7 @@ public abstract class GridCacheAbstractMetricsSelfTest extends GridCacheAbstract * @return Expected number of inner reads. */ protected int expectedReadsPerPut(boolean isPrimary) { - return isPrimary ? 1 : 2; + return 1; } /** @@ -65,7 +65,7 @@ public abstract class GridCacheAbstractMetricsSelfTest extends GridCacheAbstract * @return Expected number of misses. */ protected int expectedMissesPerPut(boolean isPrimary) { - return isPrimary ? 1 : 2; + return 1; } /** {@inheritDoc} */