http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java deleted file mode 100644 index 2861d66..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java +++ /dev/null @@ -1,519 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.processors.timeout.*; -import org.gridgain.grid.util.lang.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Transaction managed by cache ({@code 'Ex'} stands for external). - */ -public interface GridCacheTxEx<K, V> extends IgniteTx, GridTimeoutObject { - @SuppressWarnings("PublicInnerClass") - public enum FinalizationStatus { - /** Transaction was not finalized yet. */ - NONE, - - /** Transaction is being finalized by user. */ - USER_FINISH, - - /** Recovery request is received, user finish requests should be ignored. */ - RECOVERY_WAIT, - - /** Transaction is being finalized by recovery procedure. */ - RECOVERY_FINISH - } - - /** - * @return Size of the transaction. - */ - public int size(); - - /** - * @return {@code True} if transaction is allowed to use store. - */ - public boolean storeEnabled(); - - /** - * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with - * store enabled. - */ - public boolean storeUsed(); - - /** - * Checks if this is system cache transaction. System transactions are isolated from user transactions - * because some of the public API methods may be invoked inside user transactions and internally start - * system cache transactions. - * - * @return {@code True} if transaction is started for system cache. - */ - public boolean system(); - - /** - * @return Last recorded topology version. - */ - public long topologyVersion(); - - /** - * @return Flag indicating whether transaction is implicit with only one key. - */ - public boolean implicitSingle(); - - /** - * @return Collection of cache IDs involved in this transaction. - */ - public Collection<Integer> activeCacheIds(); - - /** - * Attempts to set topology version and returns the current value. - * If topology version was previously set, then it's value will - * be returned (but not updated). - * - * @param topVer Topology version. - * @return Recorded topology version. - */ - public long topologyVersion(long topVer); - - /** - * @return {@code True} if transaction is empty. - */ - public boolean empty(); - - /** - * @return {@code True} if transaction group-locked. - */ - public boolean groupLock(); - - /** - * @return Group lock key if {@link #groupLock()} is {@code true}. - */ - @Nullable public GridCacheTxKey groupLockKey(); - - /** - * @return {@code True} if preparing flag was set with this call. - */ - public boolean markPreparing(); - - /** - * @param status Finalization status to set. - * @return {@code True} if could mark was set. - */ - public boolean markFinalizing(FinalizationStatus status); - - /** - * @param part Invalid partition. - */ - public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part); - - /** - * @return Invalid partitions. - */ - public Set<Integer> invalidPartitions(); - - /** - * Gets owned version for near remote transaction. - * - * @param key Key to get version for. - * @return Owned version, if any. - */ - @Nullable public GridCacheVersion ownedVersion(GridCacheTxKey<K> key); - - /** - * Gets ID of additional node involved. For example, in DHT case, other node is - * near node ID. - * - * @return Parent node IDs. - */ - @Nullable public UUID otherNodeId(); - - /** - * @return Event node ID. - */ - public UUID eventNodeId(); - - /** - * Gets node ID which directly started this transaction. In case of DHT local transaction it will be - * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote - * transaction it will be starter node ID. - * - * @return Originating node ID. - */ - public UUID originatingNodeId(); - - /** - * @return Master node IDs. - */ - public Collection<UUID> masterNodeIds(); - - /** - * @return Near transaction ID. - */ - @Nullable public GridCacheVersion nearXidVersion(); - - /** - * @return Transaction nodes mapping (primary node -> related backup nodes). - */ - @Nullable public Map<UUID, Collection<UUID>> transactionNodes(); - - /** - * @param entry Entry to check. - * @return {@code True} if lock is owned. - * @throws GridCacheEntryRemovedException If entry has been removed. - */ - public boolean ownsLock(GridCacheEntryEx<K, V> entry) throws GridCacheEntryRemovedException; - - /** - * @param entry Entry to check. - * @return {@code True} if lock is owned. - */ - public boolean ownsLockUnsafe(GridCacheEntryEx<K, V> entry); - - /** - * For Partitioned caches, this flag is {@code false} for remote DHT and remote NEAR - * transactions because serializability of transaction is enforced on primary node. All - * other transaction types must enforce it. - * - * @return Enforce serializable flag. - */ - public boolean enforceSerializable(); - - /** - * @return {@code True} if near transaction. - */ - public boolean near(); - - /** - * @return {@code True} if DHT transaction. - */ - public boolean dht(); - - /** - * @return {@code True} if dht colocated transaction. - */ - public boolean colocated(); - - /** - * @return {@code True} if transaction is local, {@code false} if it's remote. - */ - public boolean local(); - - /** - * @return {@code True} if transaction is replicated. - */ - public boolean replicated(); - - /** - * @return Subject ID initiated this transaction. - */ - public UUID subjectId(); - - /** - * Task name hash in case if transaction was initiated within task execution. - * - * @return Task name hash. - */ - public int taskNameHash(); - - /** - * @return {@code True} if transaction is user transaction, which means: - * <ul> - * <li>Explicit</li> - * <li>Local</li> - * <li>Not DHT</li> - * </ul> - */ - public boolean user(); - - /** - * @return {@code True} if transaction is configured with synchronous commit flag. - */ - public boolean syncCommit(); - - /** - * @return {@code True} if transaction is configured with synchronous rollback flag. - */ - public boolean syncRollback(); - - /** - * @param key Key to check. - * @return {@code True} if key is present. - */ - public boolean hasWriteKey(GridCacheTxKey<K> key); - - /** - * @return Read set. - */ - public Set<GridCacheTxKey<K>> readSet(); - - /** - * @return Write set. - */ - public Set<GridCacheTxKey<K>> writeSet(); - - /** - * @return All transaction entries. - */ - public Collection<GridCacheTxEntry<K, V>> allEntries(); - - /** - * @return Write entries. - */ - public Collection<GridCacheTxEntry<K, V>> writeEntries(); - - /** - * @return Read entries. - */ - public Collection<GridCacheTxEntry<K, V>> readEntries(); - - /** - * @return Transaction write map. - */ - public Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> writeMap(); - - /** - * @return Transaction read map. - */ - public Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> readMap(); - - /** - * Gets pessimistic recovery writes, i.e. values that have never been sent to remote nodes with lock requests. - * - * @return Collection of recovery writes. - */ - public Collection<GridCacheTxEntry<K, V>> recoveryWrites(); - - /** - * Gets a list of entries that needs to be locked on the next step of prepare stage of - * optimistic transaction. - * - * @return List of tx entries for optimistic locking. - */ - public Collection<GridCacheTxEntry<K, V>> optimisticLockEntries(); - - /** - * Seals transaction for updates. - */ - public void seal(); - - /** - * @param key Key for the entry. - * @return Entry for the key (either from write set or read set). - */ - @Nullable public GridCacheTxEntry<K, V> entry(GridCacheTxKey<K> key); - - /** - * @param failFast Fail-fast flag. - * @param key Key to look up. - * @param filter Filter to check. - * @return Current value for the key within transaction. - * @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}. - */ - @Nullable public GridTuple<V> peek( - GridCacheContext<K, V> ctx, - boolean failFast, - K key, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws GridCacheFilterFailedException; - - /** - * @return Start version. - */ - public GridCacheVersion startVersion(); - - /** - * @return Transaction version. - */ - public GridCacheVersion xidVersion(); - - /** - * @return Version created at commit time. - */ - public GridCacheVersion commitVersion(); - - /** - * @param commitVer Commit version. - * @return {@code True} if version was set. - */ - public boolean commitVersion(GridCacheVersion commitVer); - - /** - * @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) - * assigned to this transaction at the end of write phase. - */ - public GridCacheVersion endVersion(); - - /** - * Prepare state. - * - * @throws IgniteCheckedException If failed. - */ - public void prepare() throws IgniteCheckedException; - - /** - * Prepare stage. - * - * @return Future for prepare step. - */ - public IgniteFuture<GridCacheTxEx<K, V>> prepareAsync(); - - /** - * @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) - * assigned to this transaction at the end of write phase. - */ - public void endVersion(GridCacheVersion endVer); - - /** - * @return Transaction write version. For all transactions except DHT transactions, will be equal to - * {@link #xidVersion()}. - */ - public GridCacheVersion writeVersion(); - - /** - * Sets write version. - * - * @param ver Write version. - */ - public void writeVersion(GridCacheVersion ver); - - /** - * @return Future for transaction completion. - */ - public IgniteFuture<IgniteTx> finishFuture(); - - /** - * @param state Transaction state. - * @return {@code True} if transition was valid, {@code false} otherwise. - */ - public boolean state(IgniteTxState state); - - /** - * @param invalidate Invalidate flag. - */ - public void invalidate(boolean invalidate); - - /** - * @param sysInvalidate System invalidate flag. - */ - public void systemInvalidate(boolean sysInvalidate); - - /** - * @return System invalidate flag. - */ - public boolean isSystemInvalidate(); - - /** - * TODO-gg-4004 Put rollback async on public API? - * Asynchronously rollback this transaction. - * - * @return Rollback future. - */ - public IgniteFuture<IgniteTx> rollbackAsync(); - - /** - * Callback invoked whenever there is a lock that has been acquired - * by this transaction for any of the participating entries. - * - * @param entry Cache entry. - * @param owner Lock candidate that won ownership of the lock. - * @return {@code True} if transaction cared about notification. - */ - public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner); - - /** - * @return {@code True} if transaction timed out. - */ - public boolean timedOut(); - - /** - * @return {@code True} if transaction had completed successfully or unsuccessfully. - */ - public boolean done(); - - /** - * @return {@code True} for OPTIMISTIC transactions. - */ - public boolean optimistic(); - - /** - * @return {@code True} for PESSIMISTIC transactions. - */ - public boolean pessimistic(); - - /** - * @return {@code True} if read-committed. - */ - public boolean readCommitted(); - - /** - * @return {@code True} if repeatable-read. - */ - public boolean repeatableRead(); - - /** - * @return {@code True} if serializable. - */ - public boolean serializable(); - - /** - * Checks whether given key has been removed within transaction. - * - * @param key Key to check. - * @return {@code True} if key has been removed. - */ - public boolean removed(GridCacheTxKey<K> key); - - /** - * Gets allowed remaining time for this transaction. - * - * @return Remaining time. - * @throws IgniteTxTimeoutException If transaction timed out. - */ - public long remainingTime() throws IgniteTxTimeoutException; - - /** - * @return Alternate transaction versions. - */ - public Collection<GridCacheVersion> alternateVersions(); - - /** - * @return {@code True} if transaction needs completed versions for processing. - */ - public boolean needsCompletedVersions(); - - /** - * @param base Base for committed versions. - * @param committed Committed transactions relative to base. - * @param rolledback Rolled back transactions relative to base. - */ - public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed, - Collection<GridCacheVersion> rolledback); - - /** - * @return {@code True} if transaction has at least one internal entry. - */ - public boolean internal(); - - /** - * @return {@code True} if transaction is a one-phase-commit transaction. - */ - public boolean onePhaseCommit(); - - /** - * @return {@code True} if transaction has transform entries. This flag will be only set for local - * transactions. - */ - public boolean hasTransforms(); -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java deleted file mode 100644 index 47631e0..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java +++ /dev/null @@ -1,1492 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; -import org.gridgain.grid.kernal.processors.cache.distributed.near.*; -import org.gridgain.grid.util.future.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; -import static org.apache.ignite.transactions.IgniteTxState.*; -import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheTxEx.FinalizationStatus.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; - -/** - * Isolated logic to process cache messages. - */ -public class GridCacheTxHandler<K, V> { - /** Logger. */ - private IgniteLogger log; - - /** Shared cache context. */ - private GridCacheSharedContext<K, V> ctx; - - public IgniteFuture<GridCacheTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId, - final GridNearTxPrepareRequest<K, V> req) { - return prepareTx(nearNodeId, null, req); - } - - /** - * @param ctx Shared cache context. - */ - public GridCacheTxHandler(GridCacheSharedContext<K, V> ctx) { - this.ctx = ctx; - - log = ctx.logger(GridCacheTxHandler.class); - - ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) { - processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest<K, V>)msg); - } - }); - - ctx.io().addHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) { - processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse<K, V>)msg); - } - }); - - ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) { - processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest<K, V>)msg); - } - }); - - ctx.io().addHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) { - processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse<K, V>)msg); - } - }); - - ctx.io().addHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) { - processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest<K, V>)msg); - } - }); - - ctx.io().addHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) { - processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse<K, V>)msg); - } - }); - - ctx.io().addHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) { - processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest<K, V>)msg); - } - }); - - ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) { - processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse<K, V>)msg); - } - }); - - ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class, - new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) { - processCheckPreparedTxRequest(nodeId, req); - } - }); - - ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class, - new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse<K, V>>() { - @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) { - processCheckPreparedTxResponse(nodeId, res); - } - }); - - ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxRequest.class, - new CI2<UUID, GridCachePessimisticCheckCommittedTxRequest<K, V>>() { - @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxRequest<K, V> req) { - processCheckCommittedTxRequest(nodeId, req); - } - }); - - ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxResponse.class, - new CI2<UUID, GridCachePessimisticCheckCommittedTxResponse<K, V>>() { - @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - processCheckCommittedTxResponse(nodeId, res); - } - }); - } - - /** - * @param nearNodeId Near node ID that initiated transaction. - * @param locTx Optional local transaction. - * @param req Near prepare request. - * @return Future for transaction. - */ - public IgniteFuture<GridCacheTxEx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx, - final GridNearTxPrepareRequest<K, V> req) { - assert nearNodeId != null; - assert req != null; - - if (locTx != null) { - if (req.near()) { - // Make sure not to provide Near entries to DHT cache. - req.cloneEntries(); - - return prepareNearTx(nearNodeId, req); - } - else - return prepareColocatedTx(locTx, req); - } - else - return prepareNearTx(nearNodeId, req); - } - - /** - * Prepares local colocated tx. - * - * @param locTx Local transaction. - * @param req Near prepare request. - * @return Prepare future. - */ - private IgniteFuture<GridCacheTxEx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx, - final GridNearTxPrepareRequest<K, V> req) { - - IgniteFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys. - - return new GridEmbeddedFuture<>( - ctx.kernalContext(), - fut, - new C2<Object, Exception, IgniteFuture<GridCacheTxEx<K, V>>>() { - @Override public IgniteFuture<GridCacheTxEx<K, V>> apply(Object o, Exception ex) { - if (ex != null) - throw new GridClosureException(ex); - - IgniteFuture<GridCacheTxEx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(), - req.transactionNodes(), req.last(), req.lastBackups()); - - if (locTx.isRollbackOnly()) - locTx.rollbackAsync(); - - return fut; - } - }, - new C2<GridCacheTxEx<K, V>, Exception, GridCacheTxEx<K, V>>() { - @Nullable @Override public GridCacheTxEx<K, V> apply(GridCacheTxEx<K, V> tx, Exception e) { - if (e != null) { - // tx can be null of exception occurred. - if (tx != null) - tx.setRollbackOnly(); // Just in case. - - if (!(e instanceof IgniteTxOptimisticException)) - U.error(log, "Failed to prepare DHT transaction: " + tx, e); - } - - return tx; - } - } - ); - } - - /** - * Prepares near transaction. - * - * @param nearNodeId Near node ID that initiated transaction. - * @param req Near prepare request. - * @return Prepare future. - */ - private IgniteFuture<GridCacheTxEx<K, V>> prepareNearTx(final UUID nearNodeId, - final GridNearTxPrepareRequest<K, V> req) { - ClusterNode nearNode = ctx.node(nearNodeId); - - if (nearNode == null) { - if (log.isDebugEnabled()) - log.debug("Received transaction request from node that left grid (will ignore): " + nearNodeId); - - return null; - } - - try { - for (GridCacheTxEntry<K, V> e : F.concat(false, req.reads(), req.writes())) - e.unmarshal(ctx, false, ctx.deploy().globalLoader()); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx.kernalContext(), e); - } - - GridDhtTxLocal<K, V> tx; - - GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version()); - - if (mappedVer != null) { - tx = ctx.tm().tx(mappedVer); - - if (tx == null) - U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version() - + ", mappedVer=" + mappedVer + ']'); - } - else { - tx = new GridDhtTxLocal<>( - ctx, - nearNode.id(), - req.version(), - req.futureId(), - req.miniId(), - req.threadId(), - /*implicit*/false, - /*implicit-single*/false, - req.system(), - req.concurrency(), - req.isolation(), - req.timeout(), - req.isInvalidate(), - false, - req.txSize(), - req.groupLockKey(), - req.partitionLock(), - req.transactionNodes(), - req.subjectId(), - req.taskNameHash() - ); - - tx = ctx.tm().onCreated(tx); - - if (tx != null) - tx.topologyVersion(req.topologyVersion()); - else - U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" + - tx.xid() + ", req=" + req + ']'); - } - - if (tx != null) { - IgniteFuture<GridCacheTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(), - req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(), - req.lastBackups()); - - if (tx.isRollbackOnly()) { - try { - tx.rollback(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction: " + tx, e); - } - } - - final GridDhtTxLocal<K, V> tx0 = tx; - - fut.listenAsync(new CI1<IgniteFuture<GridCacheTxEx<K, V>>>() { - @Override public void apply(IgniteFuture<GridCacheTxEx<K, V>> txFut) { - try { - txFut.get(); - } - catch (IgniteCheckedException e) { - tx0.setRollbackOnly(); // Just in case. - - if (!(e instanceof IgniteTxOptimisticException)) - U.error(log, "Failed to prepare DHT transaction: " + tx0, e); - } - } - }); - - return fut; - } - else - return new GridFinishedFuture<>(ctx.kernalContext(), (GridCacheTxEx<K, V>)null); - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse<K, V> res) { - GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, V>)ctx.mvcc() - .<GridCacheTxEx<K, V>>future(res.version(), res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']'); - - return; - } - - fut.onResult(nodeId, res); - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse<K, V> res) { - ctx.tm().onFinishedRemote(nodeId, res.threadId()); - - GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future( - res.xid(), res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for finish response [sender=" + nodeId + ", res=" + res + ']'); - - return; - } - - fut.onResult(nodeId, res); - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) { - GridDhtTxPrepareFuture<K, V> fut = (GridDhtTxPrepareFuture<K, V>)ctx.mvcc(). - <GridCacheTxEx<K, V>>future(res.version(), res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); - - return; - } - - fut.onResult(nodeId, res); - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - private void processDhtTxFinishResponse(UUID nodeId, GridDhtTxFinishResponse<K, V> res) { - assert nodeId != null; - assert res != null; - - GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future(res.xid(), - res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); - - return; - } - - fut.onResult(nodeId, res); - } - - /** - * @param nodeId Node ID. - * @param req Request. - * @return Future. - */ - @Nullable public IgniteFuture<IgniteTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest<K, V> req) { - return finish(nodeId, null, req); - } - - /** - * @param nodeId Node ID. - * @param req Request. - * @return Future. - */ - @Nullable public IgniteFuture<IgniteTx> finish(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx, - GridNearTxFinishRequest<K, V> req) { - assert nodeId != null; - assert req != null; - - // Transaction on local cache only. - if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped()) - return new GridFinishedFutureEx<IgniteTx>(locTx); - - if (log.isDebugEnabled()) - log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]"); - - IgniteFuture<IgniteTx> colocatedFinishFut = null; - - if (locTx != null && locTx.colocatedLocallyMapped()) - colocatedFinishFut = finishColocatedLocal(req.commit(), locTx); - - IgniteFuture<IgniteTx> nearFinishFut = null; - - if (locTx == null || locTx.nearLocallyMapped()) { - if (locTx != null) - req.cloneEntries(); - - nearFinishFut = finishDhtLocal(nodeId, locTx, req); - } - - if (colocatedFinishFut != null && nearFinishFut != null) { - GridCompoundFuture<IgniteTx, IgniteTx> res = new GridCompoundFuture<>(ctx.kernalContext()); - - res.add(colocatedFinishFut); - res.add(nearFinishFut); - - res.markInitialized(); - - return res; - } - - if (colocatedFinishFut != null) - return colocatedFinishFut; - - return nearFinishFut; - } - - /** - * @param nodeId Node ID initiated commit. - * @param locTx Optional local transaction. - * @param req Finish request. - * @return Finish future. - */ - private IgniteFuture<IgniteTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx, - GridNearTxFinishRequest<K, V> req) { - GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version()); - - GridDhtTxLocal<K, V> tx = null; - - if (dhtVer == null) { - if (log.isDebugEnabled()) - log.debug("Received transaction finish request for unknown near version (was lock explicit?): " + req); - } - else - tx = ctx.tm().tx(dhtVer); - - if (tx == null && !req.explicitLock()) { - assert locTx == null : "DHT local tx should never be lost for near local tx: " + locTx; - - U.warn(log, "Received finish request for completed transaction (the message may be too late " + - "and transaction could have been DGCed by now) [commit=" + req.commit() + - ", xid=" + req.version() + ']'); - - // Always send finish response. - GridCacheMessage<K, V> res = new GridNearTxFinishResponse<>(req.version(), req.threadId(), req.futureId(), - req.miniId(), new IgniteCheckedException("Transaction has been already completed.")); - - try { - ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (Throwable e) { - // Double-check. - if (ctx.discovery().node(nodeId) == null) { - if (log.isDebugEnabled()) - log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + - ']'); - } - else - U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " + - "res=" + res + ']', e); - } - - return null; - } - - try { - if (req.commit()) { - if (tx == null) { - // Create transaction and add entries. - tx = ctx.tm().onCreated( - new GridDhtTxLocal<>( - ctx, - nodeId, - req.version(), - req.futureId(), - req.miniId(), - req.threadId(), - true, - false, /* we don't know, so assume false. */ - req.system(), - PESSIMISTIC, - READ_COMMITTED, - /*timeout */0, - req.isInvalidate(), - req.storeEnabled(), - req.txSize(), - req.groupLockKey(), - false, - null, - req.subjectId(), - req.taskNameHash())); - - if (tx == null || !ctx.tm().onStarted(tx)) - throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + req); - - tx.topologyVersion(req.topologyVersion()); - } - - tx.storeEnabled(req.storeEnabled()); - - if (!tx.markFinalizing(USER_FINISH)) { - if (log.isDebugEnabled()) - log.debug("Will not finish transaction (it is handled by another thread): " + tx); - - return null; - } - - if (!tx.syncCommit()) - tx.syncCommit(req.syncCommit()); - - tx.nearFinishFutureId(req.futureId()); - tx.nearFinishMiniId(req.miniId()); - tx.recoveryWrites(req.recoveryWrites()); - - Collection<GridCacheTxEntry<K, V>> writeEntries = req.writes(); - - if (!F.isEmpty(writeEntries)) { - // In OPTIMISTIC mode, we get the values at PREPARE stage. - assert tx.concurrency() == PESSIMISTIC; - - for (GridCacheTxEntry<K, V> entry : writeEntries) - tx.addEntry(req.messageId(), entry); - } - - if (tx.pessimistic()) - tx.prepare(); - - IgniteFuture<IgniteTx> commitFut = tx.commitAsync(); - - // Only for error logging. - commitFut.listenAsync(CU.errorLogger(log)); - - return commitFut; - } - else { - assert tx != null : "Transaction is null for near rollback request [nodeId=" + - nodeId + ", req=" + req + "]"; - - tx.syncRollback(req.syncRollback()); - - tx.nearFinishFutureId(req.futureId()); - tx.nearFinishMiniId(req.miniId()); - - IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync(); - - // Only for error logging. - rollbackFut.listenAsync(CU.errorLogger(log)); - - return rollbackFut; - } - } - catch (Throwable e) { - U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); - - if (tx != null) { - IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync(); - - // Only for error logging. - rollbackFut.listenAsync(CU.errorLogger(log)); - - return rollbackFut; - } - - return new GridFinishedFuture<>(ctx.kernalContext(), e); - } - } - - /** - * @param commit Commit flag (rollback if {@code false}). - * @param tx Transaction to commit. - * @return Future. - */ - public IgniteFuture<IgniteTx> finishColocatedLocal(boolean commit, GridNearTxLocal<K, V> tx) { - try { - if (commit) { - if (!tx.markFinalizing(USER_FINISH)) { - if (log.isDebugEnabled()) - log.debug("Will not finish transaction (it is handled by another thread): " + tx); - - return null; - } - - return tx.commitAsyncLocal(); - } - else - return tx.rollbackAsyncLocal(); - } - catch (Throwable e) { - U.error(log, "Failed completing transaction [commit=" + commit + ", tx=" + tx + ']', e); - - if (tx != null) - return tx.rollbackAsync(); - - return new GridFinishedFuture<>(ctx.kernalContext(), e); - } - } - - /** - * @param nodeId Sender node ID. - * @param req Request. - */ - protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest<K, V> req) { - assert nodeId != null; - assert req != null; - - if (log.isDebugEnabled()) - log.debug("Processing dht tx prepare request [locNodeId=" + ctx.localNodeId() + - ", nodeId=" + nodeId + ", req=" + req + ']'); - - GridDhtTxRemote<K, V> dhtTx = null; - GridNearTxRemote<K, V> nearTx = null; - - GridDhtTxPrepareResponse<K, V> res; - - try { - res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId()); - - // Start near transaction first. - nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null; - dhtTx = startRemoteTx(nodeId, req, res); - - // Set evicted keys from near transaction. - if (nearTx != null) - res.nearEvicted(nearTx.evicted()); - - if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions())) - res.invalidPartitions(dhtTx.invalidPartitions()); - } - catch (IgniteCheckedException e) { - if (e instanceof IgniteTxRollbackException) - U.error(log, "Transaction was rolled back before prepare completed: " + dhtTx, e); - else if (e instanceof IgniteTxOptimisticException) { - if (log.isDebugEnabled()) - log.debug("Optimistic failure for remote transaction (will rollback): " + dhtTx); - } - else - U.error(log, "Failed to process prepare request: " + req, e); - - if (nearTx != null) - nearTx.rollback(); - - if (dhtTx != null) - dhtTx.rollback(); - - res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId(), e); - } - - try { - // Reply back to sender. - ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - if (e instanceof ClusterTopologyException) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx response to remote node (node left grid) [node=" + nodeId + - ", xid=" + req.version()); - } - else - U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [node=" + nodeId + - ", xid=" + req.version() + ", err=" + e.getMessage() + ']'); - - if (nearTx != null) - nearTx.rollback(); - - if (dhtTx != null) - dhtTx.rollback(); - } - } - - /** - * @param nodeId Node ID. - * @param req Request. - */ - @SuppressWarnings({"unchecked"}) - protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest<K, V> req) { - assert nodeId != null; - assert req != null; - - if (log.isDebugEnabled()) - log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']'); - - GridDhtTxRemote<K, V> dhtTx = ctx.tm().tx(req.version()); - GridNearTxRemote<K, V> nearTx = ctx.tm().nearTx(req.version()); - - try { - if (dhtTx == null && !F.isEmpty(req.writes())) - dhtTx = startRemoteTxForFinish(nodeId, req); - - if (dhtTx != null) { - dhtTx.syncCommit(req.syncCommit()); - dhtTx.syncRollback(req.syncRollback()); - } - - // One-phase commit transactions send finish requests to backup nodes. - if (dhtTx != null && req.onePhaseCommit()) { - dhtTx.onePhaseCommit(true); - - dhtTx.writeVersion(req.writeVersion()); - } - - if (nearTx == null && !F.isEmpty(req.nearWrites()) && req.groupLock()) - nearTx = startNearRemoteTxForFinish(nodeId, req); - - if (nearTx != null) { - nearTx.syncCommit(req.syncCommit()); - nearTx.syncRollback(req.syncRollback()); - } - } - catch (IgniteTxRollbackException e) { - if (log.isDebugEnabled()) - log.debug("Received finish request for completed transaction (will ignore) [req=" + req + ", err=" + - e.getMessage() + ']'); - - sendReply(nodeId, req); - - return; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to start remote DHT and Near transactions (will invalidate transactions) [dhtTx=" + - dhtTx + ", nearTx=" + nearTx + ']', e); - - if (dhtTx != null) - dhtTx.invalidate(true); - - if (nearTx != null) - nearTx.invalidate(true); - } - catch (GridDistributedLockCancelledException ignore) { - U.warn(log, "Received commit request to cancelled lock (will invalidate transaction) [dhtTx=" + - dhtTx + ", nearTx=" + nearTx + ']'); - - if (dhtTx != null) - dhtTx.invalidate(true); - - if (nearTx != null) - nearTx.invalidate(true); - } - - // Safety - local transaction will finish explicitly. - if (nearTx != null && nearTx.local()) - nearTx = null; - - finish(nodeId, dhtTx, req, req.writes()); - - if (nearTx != null) - finish(nodeId, nearTx, req, req.nearWrites()); - - sendReply(nodeId, req); - } - - /** - * @param nodeId Node ID. - * @param tx Transaction. - * @param req Request. - * @param writes Writes. - */ - protected void finish( - UUID nodeId, - GridCacheTxRemoteEx<K, V> tx, - GridDhtTxFinishRequest<K, V> req, - Collection<GridCacheTxEntry<K, V>> writes) { - // We don't allow explicit locks for transactions and - // therefore immediately return if transaction is null. - // However, we may decide to relax this restriction in - // future. - if (tx == null) { - if (req.commit()) - // Must be some long time duplicate, but we add it anyway. - ctx.tm().addCommittedTx(req.version(), null); - else - ctx.tm().addRolledbackTx(req.version()); - - if (log.isDebugEnabled()) - log.debug("Received finish request for non-existing transaction (added to completed set) " + - "[senderNodeId=" + nodeId + ", res=" + req + ']'); - - return; - } - else if (log.isDebugEnabled()) - log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req + - ", tx=" + tx + ']'); - - try { - if (req.commit() || req.isSystemInvalidate()) { - if (tx.commitVersion(req.commitVersion())) { - tx.invalidate(req.isInvalidate()); - tx.systemInvalidate(req.isSystemInvalidate()); - - if (!F.isEmpty(writes)) { - // In OPTIMISTIC mode, we get the values at PREPARE stage. - assert tx.concurrency() == PESSIMISTIC; - - for (GridCacheTxEntry<K, V> entry : writes) { - if (log.isDebugEnabled()) - log.debug("Unmarshalled transaction entry from pessimistic transaction [key=" + - entry.key() + ", value=" + entry.value() + ", tx=" + tx + ']'); - - if (!tx.setWriteValue(entry)) - U.warn(log, "Received entry to commit that was not present in transaction [entry=" + - entry + ", tx=" + tx + ']'); - } - } - - // Complete remote candidates. - tx.doneRemote(req.baseVersion(), null, null, null); - - if (tx.pessimistic()) - tx.prepare(); - - tx.commit(); - } - } - else { - assert tx != null; - - tx.doneRemote(req.baseVersion(), null, null, null); - - tx.rollback(); - } - } - catch (Throwable e) { - U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); - - // Mark transaction for invalidate. - tx.invalidate(true); - tx.systemInvalidate(true); - - try { - tx.commit(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to invalidate transaction: " + tx, ex); - } - } - } - - /** - * Sends tx finish response to remote node, if response is requested. - * - * @param nodeId Node id that originated finish request. - * @param req Request. - */ - protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req) { - if (req.replyRequired()) { - GridCacheMessage<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId()); - - try { - ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); - } - catch (Throwable e) { - // Double-check. - if (ctx.discovery().node(nodeId) == null) { - if (log.isDebugEnabled()) - log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']'); - } - else - U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e); - } - } - } - - /** - * @param nodeId Node ID. - * @param req Request. - * @param res Response. - * @return Remote transaction. - * @throws IgniteCheckedException If failed. - */ - @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId, - GridDhtTxPrepareRequest<K, V> req, - GridDhtTxPrepareResponse<K, V> res) throws IgniteCheckedException { - if (!F.isEmpty(req.writes())) { - GridDhtTxRemote<K, V> tx = ctx.tm().tx(req.version()); - - assert F.isEmpty(req.candidatesByKey()); - - if (tx == null) { - tx = new GridDhtTxRemote<>( - ctx, - req.nearNodeId(), - req.futureId(), - nodeId, - req.threadId(), - req.topologyVersion(), - req.version(), - req.commitVersion(), - req.system(), - req.concurrency(), - req.isolation(), - req.isInvalidate(), - req.timeout(), - req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(), - req.groupLockKey(), - req.nearXidVersion(), - req.transactionNodes(), - req.subjectId(), - req.taskNameHash()); - - tx = ctx.tm().onCreated(tx); - - if (tx == null || !ctx.tm().onStarted(tx)) { - if (log.isDebugEnabled()) - log.debug("Attempt to start a completed transaction (will ignore): " + tx); - - return null; - } - } - - if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) { - int idx = 0; - - for (GridCacheTxEntry<K, V> entry : req.writes()) { - GridCacheContext<K, V> cacheCtx = entry.context(); - - tx.addWrite(entry, ctx.deploy().globalLoader()); - - if (isNearEnabled(cacheCtx) && req.invalidateNearEntry(idx)) - invalidateNearEntry(cacheCtx, entry.key(), req.version()); - - try { - if (req.needPreloadKey(idx)) { - GridCacheEntryEx<K, V> cached = entry.cached(); - - if (cached == null) - cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()); - - GridCacheEntryInfo<K, V> info = cached.info(); - - if (info != null && !info.isNew() && !info.isDeleted()) - res.addPreloadEntry(info); - } - } - catch (GridDhtInvalidPartitionException e) { - tx.addInvalidPartition(cacheCtx, e.partition()); - - tx.clearEntry(entry.txKey()); - } - - idx++; - } - } - - // Prepare prior to reordering, so the pending locks added - // in prepare phase will get properly ordered as well. - tx.prepare(); - - if (req.last()) - tx.state(PREPARED); - - res.invalidPartitions(tx.invalidPartitions()); - - if (tx.empty() && req.last()) { - tx.rollback(); - - return null; - } - - return tx; - } - - return null; - } - - /** - * @param key Key - * @param ver Version. - * @throws IgniteCheckedException If invalidate failed. - */ - private void invalidateNearEntry(GridCacheContext<K, V> cacheCtx, K key, GridCacheVersion ver) - throws IgniteCheckedException { - GridNearCacheAdapter<K, V> near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near(); - - GridCacheEntryEx<K, V> nearEntry = near.peekEx(key); - - if (nearEntry != null) - nearEntry.invalidate(null, ver); - } - - /** - * Called while processing dht tx prepare request. - * - * @param ldr Loader. - * @param nodeId Sender node ID. - * @param req Request. - * @return Remote transaction. - * @throws IgniteCheckedException If failed. - */ - @Nullable public GridNearTxRemote<K, V> startNearRemoteTx(ClassLoader ldr, UUID nodeId, - GridDhtTxPrepareRequest<K, V> req) throws IgniteCheckedException { - assert F.isEmpty(req.candidatesByKey()); - - if (!F.isEmpty(req.nearWrites())) { - GridNearTxRemote<K, V> tx = ctx.tm().nearTx(req.version()); - - if (tx == null) { - tx = new GridNearTxRemote<>( - ctx, - ldr, - nodeId, - req.nearNodeId(), - req.threadId(), - req.version(), - req.commitVersion(), - req.system(), - req.concurrency(), - req.isolation(), - req.isInvalidate(), - req.timeout(), - req.nearWrites(), - req.txSize(), - req.groupLockKey(), - req.subjectId(), - req.taskNameHash() - ); - - if (!tx.empty()) { - tx = ctx.tm().onCreated(tx); - - if (tx == null || !ctx.tm().onStarted(tx)) - throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + tx); - } - } - else - tx.addEntries(ldr, req.nearWrites()); - - tx.ownedVersions(req.owned()); - - // Prepare prior to reordering, so the pending locks added - // in prepare phase will get properly ordered as well. - tx.prepare(); - - return tx; - } - - return null; - } - - /** - * @param nodeId Primary node ID. - * @param req Request. - * @return Remote transaction. - * @throws IgniteCheckedException If failed. - * @throws GridDistributedLockCancelledException If lock has been cancelled. - */ - @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable GridDhtTxRemote<K, V> startRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req) - throws IgniteCheckedException, GridDistributedLockCancelledException { - - GridDhtTxRemote<K, V> tx = null; - - boolean marked = false; - - for (GridCacheTxEntry<K, V> txEntry : req.writes()) { - GridDistributedCacheEntry<K, V> entry = null; - - GridCacheContext<K, V> cacheCtx = txEntry.context(); - - while (true) { - try { - int part = cacheCtx.affinity().partition(txEntry.key()); - - GridDhtLocalPartition<K, V> locPart = cacheCtx.topology().localPartition(part, - req.topologyVersion(), false); - - // Handle implicit locks for pessimistic transactions. - if (tx == null) - tx = ctx.tm().tx(req.version()); - - if (locPart == null || !locPart.reserve()) { - if (log.isDebugEnabled()) - log.debug("Local partition for given key is already evicted (will remove from tx) " + - "[key=" + txEntry.key() + ", part=" + part + ", locPart=" + locPart + ']'); - - if (tx != null) - tx.clearEntry(txEntry.txKey()); - - break; - } - - try { - entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key(), - req.topologyVersion()); - - if (tx == null) { - tx = new GridDhtTxRemote<>( - ctx, - req.nearNodeId(), - req.futureId(), - nodeId, - // We can pass null as nearXidVersion as transaction will be committed right away. - null, - req.threadId(), - req.topologyVersion(), - req.version(), - /*commitVer*/null, - req.system(), - PESSIMISTIC, - req.isolation(), - req.isInvalidate(), - 0, - req.txSize(), - req.groupLockKey(), - req.subjectId(), - req.taskNameHash()); - - tx = ctx.tm().onCreated(tx); - - if (tx == null || !ctx.tm().onStarted(tx)) - throw new IgniteTxRollbackException("Failed to acquire lock " + - "(transaction has been completed): " + req.version()); - } - - tx.addWrite(cacheCtx, txEntry.op(), txEntry.txKey(), txEntry.keyBytes(), txEntry.value(), - txEntry.valueBytes(), txEntry.transformClosures(), txEntry.drVersion()); - - if (!marked) { - if (tx.markFinalizing(USER_FINISH)) - marked = true; - else { - tx.clearEntry(txEntry.txKey()); - - return null; - } - } - - // Add remote candidate before reordering. - if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry()) - entry.addRemote( - req.nearNodeId(), - nodeId, - req.threadId(), - req.version(), - 0, - /*tx*/true, - tx.implicitSingle(), - null - ); - - // Double-check in case if sender node left the grid. - if (ctx.discovery().node(req.nearNodeId()) == null) { - if (log.isDebugEnabled()) - log.debug("Node requesting lock left grid (lock request will be ignored): " + req); - - tx.rollback(); - - return null; - } - - // Entry is legit. - break; - } - finally { - locPart.release(); - } - } - catch (GridCacheEntryRemovedException ignored) { - assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " + - entry; - - if (log.isDebugEnabled()) - log.debug("Received entry removed exception (will retry on renewed entry): " + entry); - - tx.clearEntry(txEntry.txKey()); - - if (log.isDebugEnabled()) - log.debug("Cleared removed entry from remote transaction (will retry) [entry=" + - entry + ", tx=" + tx + ']'); - } - catch (GridDhtInvalidPartitionException p) { - if (log.isDebugEnabled()) - log.debug("Received invalid partition (will clear entry from tx) [part=" + p + ", req=" + - req + ", txEntry=" + txEntry + ']'); - - if (tx != null) - tx.clearEntry(txEntry.txKey()); - - break; - } - } - } - - if (tx != null && tx.empty()) { - tx.rollback(); - - return null; - } - - return tx; - } - - /** - * @param nodeId Primary node ID. - * @param req Request. - * @return Remote transaction. - * @throws IgniteCheckedException If failed. - * @throws GridDistributedLockCancelledException If lock has been cancelled. - */ - @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable public GridNearTxRemote<K, V> startNearRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req) - throws IgniteCheckedException, GridDistributedLockCancelledException { - assert req.groupLock(); - - GridNearTxRemote<K, V> tx = null; - - ClassLoader ldr = ctx.deploy().globalLoader(); - - if (ldr != null) { - boolean marked = false; - - for (GridCacheTxEntry<K, V> txEntry : req.nearWrites()) { - GridDistributedCacheEntry<K, V> entry = null; - - GridCacheContext<K, V> cacheCtx = txEntry.context(); - - while (true) { - try { - entry = cacheCtx.near().peekExx(txEntry.key()); - - if (entry != null) { - entry.keyBytes(txEntry.keyBytes()); - - // Handle implicit locks for pessimistic transactions. - if (tx == null) - tx = ctx.tm().nearTx(req.version()); - - if (tx == null) { - tx = new GridNearTxRemote<>( - ctx, - nodeId, - req.nearNodeId(), - // We can pass null as nearXidVer as transaction will be committed right away. - null, - req.threadId(), - req.version(), - null, - req.system(), - PESSIMISTIC, - req.isolation(), - req.isInvalidate(), - 0, - req.txSize(), - req.groupLockKey(), - req.subjectId(), - req.taskNameHash()); - - tx = ctx.tm().onCreated(tx); - - if (tx == null || !ctx.tm().onStarted(tx)) - throw new IgniteTxRollbackException("Failed to acquire lock " + - "(transaction has been completed): " + req.version()); - - if (!marked) - marked = tx.markFinalizing(USER_FINISH); - - if (!marked) - return null; - } - - if (tx.local()) - return null; - - if (!marked) - marked = tx.markFinalizing(USER_FINISH); - - if (marked) - tx.addEntry(cacheCtx, txEntry.txKey(), txEntry.keyBytes(), txEntry.op(), txEntry.value(), - txEntry.valueBytes(), txEntry.drVersion()); - else - return null; - - if (req.groupLock()) { - tx.markGroupLock(); - - if (!txEntry.groupLockEntry()) - tx.groupLockKey(txEntry.txKey()); - } - - // Add remote candidate before reordering. - if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry()) - entry.addRemote( - req.nearNodeId(), - nodeId, - req.threadId(), - req.version(), - 0, - /*tx*/true, - tx.implicitSingle(), - null - ); - } - - // Double-check in case if sender node left the grid. - if (ctx.discovery().node(req.nearNodeId()) == null) { - if (log.isDebugEnabled()) - log.debug("Node requesting lock left grid (lock request will be ignored): " + req); - - if (tx != null) - tx.rollback(); - - return null; - } - - // Entry is legit. - break; - } - catch (GridCacheEntryRemovedException ignored) { - assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " + - entry; - - if (log.isDebugEnabled()) - log.debug("Received entry removed exception (will retry on renewed entry): " + entry); - - if (tx != null) { - tx.clearEntry(txEntry.txKey()); - - if (log.isDebugEnabled()) - log.debug("Cleared removed entry from remote transaction (will retry) [entry=" + - entry + ", tx=" + tx + ']'); - } - - // Will retry in while loop. - } - } - } - } - else { - String err = "Failed to acquire deployment class loader for message: " + req; - - U.warn(log, err); - - throw new IgniteCheckedException(err); - } - - return tx; - } - - /** - * @param nodeId Node ID. - * @param req Request. - */ - protected void processCheckPreparedTxRequest(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) { - if (log.isDebugEnabled()) - log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']'); - - boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions()); - - GridCacheOptimisticCheckPreparedTxResponse<K, V> res = - new GridCacheOptimisticCheckPreparedTxResponse<>(req.version(), req.futureId(), req.miniId(), prepared); - - try { - if (log.isDebugEnabled()) - log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - - ctx.io().send(nodeId, res); - } - catch (ClusterTopologyException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send check prepared transaction response (did node leave grid?) [nodeId=" + - nodeId + ", res=" + res + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e); - } - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) { - if (log.isDebugEnabled()) - log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - - GridCacheOptimisticCheckPreparedTxFuture<K, V> fut = (GridCacheOptimisticCheckPreparedTxFuture<K, V>)ctx.mvcc(). - <Boolean>future(res.version(), res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); - - return; - } - - fut.onResult(nodeId, res); - } - - /** - * @param nodeId Node ID. - * @param req Request. - */ - protected void processCheckCommittedTxRequest(final UUID nodeId, - final GridCachePessimisticCheckCommittedTxRequest<K, V> req) { - if (log.isDebugEnabled()) - log.debug("Processing check committed transaction request [nodeId=" + nodeId + ", req=" + req + ']'); - - IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut = ctx.tm().checkPessimisticTxCommitted(req); - - infoFut.listenAsync(new CI1<IgniteFuture<GridCacheCommittedTxInfo<K, V>>>() { - @Override public void apply(IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut) { - GridCacheCommittedTxInfo<K, V> info = null; - - try { - info = infoFut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to obtain committed info for transaction (will rollback): " + req, e); - } - - GridCachePessimisticCheckCommittedTxResponse<K, V> - res = new GridCachePessimisticCheckCommittedTxResponse<>( - req.version(), req.futureId(), req.miniId(), info); - - if (log.isDebugEnabled()) - log.debug("Finished waiting for tx committed info [req=" + req + ", res=" + res + ']'); - - sendCheckCommittedResponse(nodeId, res); } - }); - } - - /** - * @param nodeId Node ID. - * @param res Response. - */ - protected void processCheckCommittedTxResponse(UUID nodeId, - GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - if (log.isDebugEnabled()) - log.debug("Processing check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - - GridCachePessimisticCheckCommittedTxFuture<K, V> fut = - (GridCachePessimisticCheckCommittedTxFuture<K, V>)ctx.mvcc().<GridCacheCommittedTxInfo<K, V>>future( - res.version(), res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); - - return; - } - - fut.onResult(nodeId, res); - } - - /** - * Sends check committed response to remote node. - * - * @param nodeId Node ID to send to. - * @param res Reponse to send. - */ - private void sendCheckCommittedResponse(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) { - try { - if (log.isDebugEnabled()) - log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - - ctx.io().send(nodeId, res); - } - catch (ClusterTopologyException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send check committed transaction response (did node leave grid?) [nodeId=" + - nodeId + ", res=" + res + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxKey.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxKey.java deleted file mode 100644 index d804023..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxKey.java +++ /dev/null @@ -1,97 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.io.*; - -/** - * Cache transaction key. This wrapper is needed because same keys may be enlisted in the same transaction - * for multiple caches. - */ -public class GridCacheTxKey<K> implements Externalizable { - /** Key. */ - @GridToStringInclude - private K key; - - /** Cache ID. */ - private int cacheId; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridCacheTxKey() { - // No-op. - } - - /** - * @param key User key. - * @param cacheId Cache ID. - */ - public GridCacheTxKey(K key, int cacheId) { - this.key = key; - this.cacheId = cacheId; - } - - /** - * @return User key. - */ - public K key() { - return key; - } - - /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof GridCacheTxKey)) - return false; - - GridCacheTxKey that = (GridCacheTxKey)o; - - return cacheId == that.cacheId && key.equals(that.key); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = key.hashCode(); - - res = 31 * res + cacheId; - - return res; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(cacheId); - out.writeObject(key); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheId = in.readInt(); - key = (K)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheTxKey.class, this); - } -}