http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java new file mode 100644 index 0000000..7c8584c --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalEx.java @@ -0,0 +1,167 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.transactions; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.dr.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Local transaction API. + */ +public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> { + /** + * @return Minimum version involved in transaction. + */ + public GridCacheVersion minVersion(); + + /** + * @return Future for this transaction. + */ + public IgniteFuture<IgniteTxEx<K, V>> future(); + + /** + * @return Commit error. + */ + @Nullable public Throwable commitError(); + + /** + * @param e Commit error. + */ + public void commitError(Throwable e); + + /** + * @throws IgniteCheckedException If commit failed. + */ + public void userCommit() throws IgniteCheckedException; + + /** + * @throws IgniteCheckedException If rollback failed. + */ + public void userRollback() throws IgniteCheckedException; + + /** + * @return Group lock entry if this is a group-lock transaction. + */ + @Nullable public IgniteTxEntry<K, V> groupLockEntry(); + + /** + * @param keys Keys to get. + * @param cached Cached entry if this method is called from entry wrapper. + * Cached entry is passed if and only if there is only one key in collection of keys. + * @param deserializePortable Deserialize portable flag. + * @param filter Entry filter. + * @return Future for this get. + */ + public IgniteFuture<Map<K, V>> getAllAsync( + GridCacheContext<K, V> cacheCtx, + Collection<? extends K> keys, + @Nullable GridCacheEntryEx<K, V> cached, + boolean deserializePortable, + IgnitePredicate<GridCacheEntry<K, V>>[] filter); + + /** + * @param map Map to put. + * @param retval Flag indicating whether a value should be returned. + * @param cached Cached entry, if any. Will be provided only if map has size 1. + * @param filter Filter. + * @param ttl Time to live for entry. If negative, leave unchanged. + * @return Future for put operation. + */ + public IgniteFuture<GridCacheReturn<V>> putAllAsync( + GridCacheContext<K, V> cacheCtx, + Map<? extends K, ? extends V> map, + boolean retval, + @Nullable GridCacheEntryEx<K, V> cached, + long ttl, + IgnitePredicate<GridCacheEntry<K, V>>[] filter); + + /** + * @param map Map to put. + * @return Transform operation future. + */ + public IgniteFuture<GridCacheReturn<V>> transformAllAsync( + GridCacheContext<K, V> cacheCtx, + @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> map, + boolean retval, + @Nullable GridCacheEntryEx<K, V> cached, + long ttl); + + /** + * @param keys Keys to remove. + * @param retval Flag indicating whether a value should be returned. + * @param cached Cached entry, if any. Will be provided only if size of keys collection is 1. + * @param filter Filter. + * @return Future for asynchronous remove. + */ + public IgniteFuture<GridCacheReturn<V>> removeAllAsync( + GridCacheContext<K, V> cacheCtx, + Collection<? extends K> keys, + @Nullable GridCacheEntryEx<K, V> cached, + boolean retval, + IgnitePredicate<GridCacheEntry<K, V>>[] filter); + + /** + * @param drMap DR map to put. + * @return Future for DR put operation. + */ + public IgniteFuture<?> putAllDrAsync( + GridCacheContext<K, V> cacheCtx, + Map<? extends K, GridCacheDrInfo<V>> drMap); + + /** + * @param drMap DR map. + * @return Future for asynchronous remove. + */ + public IgniteFuture<?> removeAllDrAsync( + GridCacheContext<K, V> cacheCtx, + Map<? extends K, GridCacheVersion> drMap); + + /** + * Performs keys locking for affinity-based group lock transactions. + * + * @param keys Keys to lock. + * @return Lock future. + */ + public IgniteFuture<?> groupLockAsync(GridCacheContext<K, V> cacheCtx, Collection<K> keys); + + /** + * @return {@code True} if keys from the same partition are allowed to be enlisted in group-lock transaction. + */ + public boolean partitionLock(); + + /** + * Finishes transaction (either commit or rollback). + * + * @param commit {@code True} if commit, {@code false} if rollback. + * @return {@code True} if state has been changed. + * @throws IgniteCheckedException If finish failed. + */ + public boolean finish(boolean commit) throws IgniteCheckedException; + + /** + * @param async if {@code True}, then loading will happen in a separate thread. + * @param keys Keys. + * @param c Closure. + * @param deserializePortable Deserialize portable flag. + * @return Future with {@code True} value if loading took place. + */ + public IgniteFuture<Boolean> loadMissing( + GridCacheContext<K, V> cacheCtx, + boolean async, + Collection<? extends K> keys, + boolean deserializePortable, + IgniteBiInClosure<K, V> c); +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxManager.java new file mode 100644 index 0000000..a176ff1 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxManager.java @@ -0,0 +1,2213 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.transactions; + +import org.apache.ignite.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.managers.eventstorage.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; +import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.gridgain.grid.kernal.processors.timeout.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.future.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxState.*; +import static org.gridgain.grid.kernal.processors.cache.transactions.IgniteTxEx.FinalizationStatus.*; +import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; +import static org.gridgain.grid.util.GridConcurrentFactory.*; + +/** + * Cache transaction manager. + */ +public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { + /** Default maximum number of transactions that have completed. */ + private static final int DFLT_MAX_COMPLETED_TX_CNT = 262144; // 2^18 + + /** Slow tx warn timeout (initialized to 0). */ + private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(GG_SLOW_TX_WARN_TIMEOUT, 0); + + /** Tx salvage timeout (default 3s). */ + private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(GG_TX_SALVAGE_TIMEOUT, 100); + + /** Committing transactions. */ + private final ThreadLocal<IgniteTxEx> threadCtx = new GridThreadLocalEx<>(); + + /** Per-thread transaction map. */ + private final ConcurrentMap<Long, IgniteTxEx<K, V>> threadMap = newMap(); + + /** Per-ID map. */ + private final ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> idMap = newMap(); + + /** Per-ID map for near transactions. */ + private final ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> nearIdMap = newMap(); + + /** TX handler. */ + private IgniteTxHandler<K, V> txHandler; + + /** All transactions. */ + private final Queue<IgniteTxEx<K, V>> committedQ = new ConcurrentLinkedDeque8<>(); + + /** Preparing transactions. */ + private final Queue<IgniteTxEx<K, V>> prepareQ = new ConcurrentLinkedDeque8<>(); + + /** Minimum start version. */ + private final ConcurrentNavigableMap<GridCacheVersion, AtomicInt> startVerCnts = + new ConcurrentSkipListMap<>(); + + /** Committed local transactions. */ + private final GridBoundedConcurrentOrderedSet<GridCacheVersion> committedVers = + new GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); + + /** Rolled back local transactions. */ + private final NavigableSet<GridCacheVersion> rolledbackVers = + new GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); + + /** Pessimistic commit buffer. */ + private GridCacheTxCommitBuffer<K, V> pessimisticRecoveryBuf; + + /** Transaction synchronizations. */ + private final Collection<IgniteTxSynchronization> syncs = + new GridConcurrentHashSet<>(); + + /** Transaction finish synchronizer. */ + private GridCacheTxFinishSync<K, V> txFinishSync; + + /** For test purposes only. */ + private boolean finishSyncDisabled; + + /** Slow tx warn timeout. */ + private int slowTxWarnTimeout = SLOW_TX_WARN_TIMEOUT; + + /** + * Near version to DHT version map. Note that we initialize to 5K size from get go, + * to avoid future map resizings. + */ + private final ConcurrentMap<GridCacheVersion, GridCacheVersion> mappedVers = + new ConcurrentHashMap8<>(5120); + + /** {@inheritDoc} */ + @Override protected void onKernalStart0() { + cctx.gridEvents().addLocalEventListener( + new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + assert evt instanceof IgniteDiscoveryEvent; + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(discoEvt.eventNode().id())); + + if (txFinishSync != null) + txFinishSync.onNodeLeft(discoEvt.eventNode().id()); + } + }, + EVT_NODE_FAILED, EVT_NODE_LEFT); + + for (IgniteTxEx<K, V> tx : idMap.values()) { + if ((!tx.local() || tx.dht()) && !cctx.discovery().aliveAll(tx.masterNodeIds())) { + if (log.isDebugEnabled()) + log.debug("Remaining transaction from left node: " + tx); + + salvageTx(tx, true, USER_FINISH); + } + } + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + pessimisticRecoveryBuf = new GridCachePerThreadTxCommitBuffer<>(cctx); + + txFinishSync = new GridCacheTxFinishSync<>(cctx); + + txHandler = new IgniteTxHandler<>(cctx); + } + + /** + * @return TX handler. + */ + public IgniteTxHandler<K, V> txHandler() { + return txHandler; + } + + /** + * Invalidates transaction. + * + * @param tx Transaction. + * @return {@code True} if transaction was salvaged by this call. + */ + public boolean salvageTx(IgniteTxEx<K, V> tx) { + return salvageTx(tx, false, USER_FINISH); + } + + /** + * Invalidates transaction. + * + * @param tx Transaction. + * @param warn {@code True} if warning should be logged. + * @param status Finalization status. + * @return {@code True} if transaction was salvaged by this call. + */ + private boolean salvageTx(IgniteTxEx<K, V> tx, boolean warn, IgniteTxEx.FinalizationStatus status) { + assert tx != null; + + IgniteTxState state = tx.state(); + + if (state == ACTIVE || state == PREPARING || state == PREPARED) { + try { + if (!tx.markFinalizing(status)) { + if (log.isDebugEnabled()) + log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx); + + return false; + } + + tx.systemInvalidate(true); + + tx.prepare(); + + if (tx.state() == PREPARING) { + if (log.isDebugEnabled()) + log.debug("Ignoring transaction in PREPARING state as it is currently handled " + + "by another thread: " + tx); + + return false; + } + + if (tx instanceof IgniteTxRemoteEx) { + IgniteTxRemoteEx<K, V> rmtTx = (IgniteTxRemoteEx<K, V>)tx; + + rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList()); + } + + tx.commit(); + + if (warn) { + // This print out cannot print any peer-deployed entity either + // directly or indirectly. + U.warn(log, "Invalidated transaction because originating node either " + + "crashed or left grid: " + CU.txString(tx)); + } + } + catch (IgniteTxOptimisticException ignore) { + if (log.isDebugEnabled()) + log.debug("Optimistic failure while invalidating transaction (will rollback): " + + tx.xidVersion()); + + try { + tx.rollback(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to invalidate transaction: " + tx, e); + } + } + else if (state == MARKED_ROLLBACK) { + try { + tx.rollback(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e); + } + } + + return true; + } + + /** + * Prints out memory stats to standard out. + * <p> + * USE ONLY FOR MEMORY PROFILING DURING TESTS. + */ + @Override public void printMemoryStats() { + IgniteTxEx<K, V> firstTx = committedQ.peek(); + + int committedSize = committedQ.size(); + + Map.Entry<GridCacheVersion, AtomicInt> startVerEntry = startVerCnts.firstEntry(); + + GridCacheVersion minStartVer = null; + long dur = 0; + + if (committedSize > 3000) { + minStartVer = new GridCacheVersion(Integer.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, 0); + + IgniteTxEx<K, V> stuck = null; + + for (IgniteTxEx<K, V> tx : txs()) + if (tx.startVersion().isLess(minStartVer)) { + minStartVer = tx.startVersion(); + dur = U.currentTimeMillis() - tx.startTime(); + + stuck = tx; + } + + X.println("Stuck transaction: " + stuck); + } + + X.println(">>> "); + X.println(">>> Transaction manager memory stats [grid=" + cctx.gridName() + ']'); + X.println(">>> threadMapSize: " + threadMap.size()); + X.println(">>> idMap [size=" + idMap.size() + ", minStartVer=" + minStartVer + ", dur=" + dur + "ms]"); + X.println(">>> committedQueue [size=" + committedSize + + ", firstStartVersion=" + (firstTx == null ? "null" : firstTx.startVersion()) + + ", firstEndVersion=" + (firstTx == null ? "null" : firstTx.endVersion()) + ']'); + X.println(">>> prepareQueueSize: " + prepareQ.size()); + X.println(">>> startVerCntsSize [size=" + startVerCnts.size() + + ", firstVer=" + startVerEntry + ']'); + X.println(">>> committedVersSize: " + committedVers.size()); + X.println(">>> rolledbackVersSize: " + rolledbackVers.size()); + + if (pessimisticRecoveryBuf != null) + X.println(">>> pessimsticCommitBufSize: " + pessimisticRecoveryBuf.size()); + } + + /** + * @return Thread map size. + */ + public int threadMapSize() { + return threadMap.size(); + } + + /** + * @return ID map size. + */ + public int idMapSize() { + return idMap.size(); + } + + /** + * @return Committed queue size. + */ + public int commitQueueSize() { + return committedQ.size(); + } + + /** + * @return Prepare queue size. + */ + public int prepareQueueSize() { + return prepareQ.size(); + } + + /** + * @return Start version counts. + */ + public int startVersionCountsSize() { + return startVerCnts.size(); + } + + /** + * @return Committed versions size. + */ + public int committedVersionsSize() { + return committedVers.size(); + } + + /** + * @return Rolled back versions size. + */ + public int rolledbackVersionsSize() { + return rolledbackVers.size(); + } + + /** + * + * @param tx Transaction to check. + * @return {@code True} if transaction has been committed or rolled back, + * {@code false} otherwise. + */ + public boolean isCompleted(IgniteTxEx<K, V> tx) { + return committedVers.contains(tx.xidVersion()) || rolledbackVers.contains(tx.xidVersion()); + } + + /** + * @param implicit {@code True} if transaction is implicit. + * @param implicitSingle Implicit-with-single-key flag. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param timeout transaction timeout. + * @param txSize Expected transaction size. + * @param grpLockKey Group lock key if this is a group-lock transaction. + * @param partLock {@code True} if partition is locked. + * @return New transaction. + */ + public IgniteTxLocalAdapter<K, V> newTx( + boolean implicit, + boolean implicitSingle, + boolean sys, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + long timeout, + boolean invalidate, + boolean storeEnabled, + int txSize, + @Nullable IgniteTxKey grpLockKey, + boolean partLock) { + UUID subjId = null; // TODO GG-9141 how to get subj ID? + + int taskNameHash = cctx.kernalContext().job().currentTaskNameHash(); + + GridNearTxLocal<K, V> tx = new GridNearTxLocal<>( + cctx, + implicit, + implicitSingle, + sys, + concurrency, + isolation, + timeout, + invalidate, + storeEnabled, + txSize, + grpLockKey, + partLock, + subjId, + taskNameHash); + + return onCreated(tx); + } + + /** + * @param tx Created transaction. + * @return Started transaction. + */ + @Nullable public <T extends IgniteTxEx<K, V>> T onCreated(T tx) { + ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap = transactionMap(tx); + + // Start clean. + txContextReset(); + + if (isCompleted(tx)) { + if (log.isDebugEnabled()) + log.debug("Attempt to create a completed transaction (will ignore): " + tx); + + return null; + } + + IgniteTxEx<K, V> t; + + if ((t = txIdMap.putIfAbsent(tx.xidVersion(), tx)) == null) { + // Add both, explicit and implicit transactions. + // Do not add remote and dht local transactions as remote node may have the same thread ID + // and overwrite local transaction. + if (tx.local() && !tx.dht()) + threadMap.put(tx.threadId(), tx); + + // Handle mapped versions. + if (tx instanceof GridCacheMappedVersion) { + GridCacheMappedVersion mapped = (GridCacheMappedVersion)tx; + + GridCacheVersion from = mapped.mappedVersion(); + + if (from != null) + mappedVers.put(from, tx.xidVersion()); + + if (log.isDebugEnabled()) + log.debug("Added transaction version mapping [from=" + from + ", to=" + tx.xidVersion() + + ", tx=" + tx + ']'); + } + } + else { + if (log.isDebugEnabled()) + log.debug("Attempt to create an existing transaction (will ignore) [newTx=" + tx + ", existingTx=" + + t + ']'); + + return null; + } + + if (cctx.txConfig().isTxSerializableEnabled()) { + AtomicInt next = new AtomicInt(1); + + boolean loop = true; + + while (loop) { + AtomicInt prev = startVerCnts.putIfAbsent(tx.startVersion(), next); + + if (prev == null) + break; // Put succeeded - exit. + + // Previous value was 0, which means that it will be deleted + // by another thread in "decrementStartVersionCount(..)" method. + // In that case, we delete here too, so we can safely try again. + for (;;) { + int p = prev.get(); + + assert p >= 0 : p; + + if (p == 0) { + if (startVerCnts.remove(tx.startVersion(), prev)) + if (log.isDebugEnabled()) + log.debug("Removed count from onCreated callback: " + tx); + + break; // Retry outer loop. + } + + if (prev.compareAndSet(p, p + 1)) { + loop = false; // Increment succeeded - exit outer loop. + + break; + } + } + } + } + + if (tx.timeout() > 0) { + cctx.time().addTimeoutObject(tx); + + if (log.isDebugEnabled()) + log.debug("Registered transaction with timeout processor: " + tx); + } + + if (log.isDebugEnabled()) + log.debug("Transaction created: " + tx); + + return tx; + } + + /** + * Creates a future that will wait for all ongoing transactions that maybe affected by topology update + * to be finished. This set of transactions include + * <ul> + * <li/> All {@link IgniteTxConcurrency#PESSIMISTIC} transactions with topology version + * less or equal to {@code topVer}. + * <li/> {@link IgniteTxConcurrency#OPTIMISTIC} transactions in PREPARING state with topology + * version less or equal to {@code topVer} and having transaction key with entry that belongs to + * one of partitions in {@code parts}. + * </ul> + * + * @param topVer Topology version. + * @return Future that will be completed when all ongoing transactions are finished. + */ + public IgniteFuture<Boolean> finishTxs(long topVer) { + GridCompoundFuture<IgniteTx, Boolean> res = + new GridCompoundFuture<>(context().kernalContext(), + new IgniteReducer<IgniteTx, Boolean>() { + @Override public boolean collect(IgniteTx e) { + return true; + } + + @Override public Boolean reduce() { + return true; + } + }); + + for (IgniteTxEx<K, V> tx : txs()) { + // Must wait for all transactions, even for DHT local and DHT remote since preloading may acquire + // values pending to be overwritten by prepared transaction. + + if (tx.concurrency() == PESSIMISTIC) { + if (tx.topologyVersion() > 0 && tx.topologyVersion() < topVer) + // For PESSIMISTIC mode we must wait for all uncompleted txs + // as we do not know in advance which keys will participate in tx. + res.add(tx.finishFuture()); + } + else if (tx.concurrency() == OPTIMISTIC) { + // For OPTIMISTIC mode we wait only for txs in PREPARING state that + // have keys for given partitions. + IgniteTxState state = tx.state(); + long txTopVer = tx.topologyVersion(); + + if ((state == PREPARING || state == PREPARED || state == COMMITTING) + && txTopVer > 0 && txTopVer < topVer) { + res.add(tx.finishFuture()); + } + } + } + + res.markInitialized(); + + return res; + } + + /** + * Transaction start callback (has to do with when any operation was + * performed on this transaction). + * + * @param tx Started transaction. + * @return {@code True} if transaction is not in completed set. + */ + public boolean onStarted(IgniteTxEx<K, V> tx) { + assert tx.state() == ACTIVE || tx.isRollbackOnly() : "Invalid transaction state [locId=" + cctx.localNodeId() + + ", tx=" + tx + ']'; + + if (isCompleted(tx)) { + if (log.isDebugEnabled()) + log.debug("Attempt to start a completed transaction (will ignore): " + tx); + + return false; + } + + onTxStateChange(null, ACTIVE, tx); + + if (log.isDebugEnabled()) + log.debug("Transaction started: " + tx); + + return true; + } + + /** + * Reverse mapped version look up. + * + * @param dhtVer Dht version. + * @return Near version. + */ + @Nullable public GridCacheVersion nearVersion(GridCacheVersion dhtVer) { + IgniteTxEx<K, V> tx = idMap.get(dhtVer); + + if (tx != null) + return tx.nearXidVersion(); + + return null; + } + + /** + * @param from Near version. + * @return DHT version for a near version. + */ + public GridCacheVersion mappedVersion(GridCacheVersion from) { + GridCacheVersion to = mappedVers.get(from); + + if (log.isDebugEnabled()) + log.debug("Found mapped version [from=" + from + ", to=" + to); + + return to; + } + + /** + * + * @param ver Alternate version. + * @param tx Transaction. + */ + public void addAlternateVersion(GridCacheVersion ver, IgniteTxEx<K, V> tx) { + if (idMap.putIfAbsent(ver, tx) == null) + if (log.isDebugEnabled()) + log.debug("Registered alternate transaction version [ver=" + ver + ", tx=" + tx + ']'); + } + + /** + * @return Local transaction. + */ + @SuppressWarnings({"unchecked"}) + @Nullable public <T> T localTx() { + IgniteTxEx<K, V> tx = tx(); + + return tx != null && tx.local() ? (T)tx : null; + } + + /** + * @return Transaction for current thread. + */ + @SuppressWarnings({"unchecked"}) + public <T> T threadLocalTx() { + IgniteTxEx<K, V> tx = tx(Thread.currentThread().getId()); + + return tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit() ? (T)tx : null; + } + + /** + * @return Transaction for current thread. + */ + @SuppressWarnings({"unchecked", "RedundantCast"}) + public <T> T tx() { + IgniteTxEx<K, V> tx = txContext(); + + return tx != null ? (T)tx : (T)tx(Thread.currentThread().getId()); + } + + /** + * @return Local transaction. + */ + @Nullable public IgniteTxEx<K, V> localTxx() { + IgniteTxEx<K, V> tx = txx(); + + return tx != null && tx.local() ? tx : null; + } + + /** + * @return Transaction for current thread. + */ + @SuppressWarnings({"unchecked"}) + public IgniteTxEx<K, V> txx() { + return tx(); + } + + /** + * @return User transaction for current thread. + */ + @Nullable public IgniteTx userTx() { + IgniteTxEx<K, V> tx = txContext(); + + if (tx != null && tx.user() && tx.state() == ACTIVE) + return tx; + + tx = tx(Thread.currentThread().getId()); + + return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null; + } + + /** + * @return User transaction. + */ + @SuppressWarnings({"unchecked"}) + @Nullable public <T extends IgniteTxLocalEx<K, V>> T userTxx() { + return (T)userTx(); + } + + /** + * @param threadId Id of thread for transaction. + * @return Transaction for thread with given ID. + */ + @SuppressWarnings({"unchecked"}) + public <T> T tx(long threadId) { + return (T)threadMap.get(threadId); + } + + /** + * @return {@code True} if current thread is currently within transaction. + */ + public boolean inUserTx() { + return userTx() != null; + } + + /** + * @param txId Transaction ID. + * @return Transaction with given ID. + */ + @SuppressWarnings({"unchecked"}) + @Nullable public <T extends IgniteTxEx<K, V>> T tx(GridCacheVersion txId) { + return (T)idMap.get(txId); + } + + /** + * @param txId Transaction ID. + * @return Transaction with given ID. + */ + @SuppressWarnings({"unchecked"}) + @Nullable public <T extends IgniteTxEx<K, V>> T nearTx(GridCacheVersion txId) { + return (T)nearIdMap.get(txId); + } + + /** + * @param txId Transaction ID. + * @return Transaction with given ID. + */ + @Nullable public IgniteTxEx<K, V> txx(GridCacheVersion txId) { + return idMap.get(txId); + } + + /** + * Handles prepare stage of 2PC. + * + * @param tx Transaction to prepare. + * @throws IgniteCheckedException If preparation failed. + */ + public void prepareTx(IgniteTxEx<K, V> tx) throws IgniteCheckedException { + if (tx.state() == MARKED_ROLLBACK) { + if (tx.timedOut()) + throw new IgniteTxTimeoutException("Transaction timed out: " + this); + + throw new IgniteCheckedException("Transaction is marked for rollback: " + tx); + } + + if (tx.remainingTime() == 0) { + tx.setRollbackOnly(); + + throw new IgniteTxTimeoutException("Transaction timed out: " + this); + } + + boolean txSerializableEnabled = cctx.txConfig().isTxSerializableEnabled(); + + // Clean up committed transactions queue. + if (tx.pessimistic()) { + if (tx.enforceSerializable() && txSerializableEnabled) { + for (Iterator<IgniteTxEx<K, V>> it = committedQ.iterator(); it.hasNext();) { + IgniteTxEx<K, V> committedTx = it.next(); + + assert committedTx != tx; + + // Clean up. + if (isSafeToForget(committedTx)) + it.remove(); + } + } + + // Nothing else to do in pessimistic mode. + return; + } + + if (txSerializableEnabled && tx.optimistic() && tx.enforceSerializable()) { + Set<IgniteTxKey<K>> readSet = tx.readSet(); + Set<IgniteTxKey<K>> writeSet = tx.writeSet(); + + GridCacheVersion startTn = tx.startVersion(); + + GridCacheVersion finishTn = cctx.versions().last(); + + // Add future to prepare queue only on first prepare call. + if (tx.markPreparing()) + prepareQ.offer(tx); + + // Check that our read set does not intersect with write set + // of all transactions that completed their write phase + // while our transaction was in read phase. + for (Iterator<IgniteTxEx<K, V>> it = committedQ.iterator(); it.hasNext();) { + IgniteTxEx<K, V> committedTx = it.next(); + + assert committedTx != tx; + + // Clean up. + if (isSafeToForget(committedTx)) { + it.remove(); + + continue; + } + + GridCacheVersion tn = committedTx.endVersion(); + + // We only care about transactions + // with tn > startTn and tn <= finishTn + if (tn.compareTo(startTn) <= 0 || tn.compareTo(finishTn) > 0) + continue; + + if (tx.serializable()) { + if (GridFunc.intersects(committedTx.writeSet(), readSet)) { + tx.setRollbackOnly(); + + throw new IgniteTxOptimisticException("Failed to prepare transaction " + + "(committed vs. read-set conflict): " + tx); + } + } + } + + // Check that our read and write sets do not intersect with write + // sets of all active transactions. + for (Iterator<IgniteTxEx<K, V>> iter = prepareQ.iterator(); iter.hasNext();) { + IgniteTxEx<K, V> prepareTx = iter.next(); + + if (prepareTx == tx) + // Skip yourself. + continue; + + // Optimistically remove completed transactions. + if (prepareTx.done()) { + iter.remove(); + + if (log.isDebugEnabled()) + log.debug("Removed finished transaction from active queue: " + prepareTx); + + continue; + } + + // Check if originating node left. + if (cctx.discovery().node(prepareTx.nodeId()) == null) { + iter.remove(); + + rollbackTx(prepareTx); + + if (log.isDebugEnabled()) + log.debug("Removed and rolled back transaction because sender node left grid: " + + CU.txString(prepareTx)); + + continue; + } + + if (tx.serializable() && !prepareTx.isRollbackOnly()) { + Set<IgniteTxKey<K>> prepareWriteSet = prepareTx.writeSet(); + + if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) { + // Remove from active set. + iter.remove(); + + tx.setRollbackOnly(); + + throw new IgniteTxOptimisticException( + "Failed to prepare transaction (read-set/write-set conflict): " + tx); + } + } + } + } + + // Optimistic. + assert tx.optimistic(); + + if (!lockMultiple(tx, tx.optimisticLockEntries())) { + tx.setRollbackOnly(); + + throw new IgniteTxOptimisticException("Failed to prepare transaction (lock conflict): " + tx); + } + } + + /** + * @param tx Transaction to check. + * @return {@code True} if transaction can be discarded. + */ + private boolean isSafeToForget(IgniteTxEx<K, V> tx) { + Map.Entry<GridCacheVersion, AtomicInt> e = startVerCnts.firstEntry(); + + if (e == null) + return true; + + assert e.getValue().get() >= 0; + + return tx.endVersion().compareTo(e.getKey()) <= 0; + } + + /** + * Decrement start version count. + * + * @param tx Cache transaction. + */ + private void decrementStartVersionCount(IgniteTxEx<K, V> tx) { + AtomicInt cnt = startVerCnts.get(tx.startVersion()); + + assert cnt != null : "Failed to find start version count for transaction [startVerCnts=" + startVerCnts + + ", tx=" + tx + ']'; + + assert cnt.get() > 0; + + if (cnt.decrementAndGet() == 0) + if (startVerCnts.remove(tx.startVersion(), cnt)) + if (log.isDebugEnabled()) + log.debug("Removed start version for transaction: " + tx); + } + + /** + * @param tx Transaction. + */ + private void removeObsolete(IgniteTxEx<K, V> tx) { + Collection<IgniteTxEntry<K, V>> entries = (tx.local() && !tx.dht()) ? tx.allEntries() : tx.writeEntries(); + + for (IgniteTxEntry<K, V> entry : entries) { + GridCacheEntryEx<K, V> cached = entry.cached(); + + GridCacheContext<K, V> cacheCtx = entry.context(); + + if (cached == null) + cached = cacheCtx.cache().peekEx(entry.key()); + + if (cached.detached()) + continue; + + try { + if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion())) + cacheCtx.cache().removeEntry(cached); + + if (!tx.near() && isNearEnabled(cacheCtx)) { + GridNearCacheAdapter<K, V> near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near(); + + GridNearCacheEntry<K, V> e = near.peekExx(entry.key()); + + if (e != null && e.markObsoleteIfEmpty(tx.xidVersion())) + near.removeEntry(e); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to remove obsolete entry from cache: " + cached, e); + } + } + } + + /** + * @param c Collection to copy. + * @return Copy of the collection. + */ + private Collection<GridCacheVersion> copyOf(Iterable<GridCacheVersion> c) { + Collection<GridCacheVersion> l = new LinkedList<>(); + + for (GridCacheVersion v : c) + l.add(v); + + return l; + } + + /** + * Gets committed transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive? + * + * @param min Start (or minimum) version. + * @return Committed transactions starting from the given version (non-inclusive). + */ + public Collection<GridCacheVersion> committedVersions(GridCacheVersion min) { + Set<GridCacheVersion> set = committedVers.tailSet(min, true); + + return set == null || set.isEmpty() ? Collections.<GridCacheVersion>emptyList() : copyOf(set); + } + + /** + * Gets rolledback transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive? + * + * @param min Start (or minimum) version. + * @return Committed transactions starting from the given version (non-inclusive). + */ + public Collection<GridCacheVersion> rolledbackVersions(GridCacheVersion min) { + Set<GridCacheVersion> set = rolledbackVers.tailSet(min, true); + + return set == null || set.isEmpty() ? Collections.<GridCacheVersion>emptyList() : copyOf(set); + } + + /** + * @param tx Tx to remove. + */ + public void removeCommittedTx(IgniteTxEx<K, V> tx) { + committedVers.remove(tx.xidVersion()); + } + + /** + * @param tx Committed transaction. + * @return If transaction was not already present in committed set. + */ + public boolean addCommittedTx(IgniteTxEx<K, V> tx) { + return addCommittedTx(tx.xidVersion(), tx.nearXidVersion()); + } + + /** + * @param tx Committed transaction. + * @return If transaction was not already present in committed set. + */ + public boolean addRolledbackTx(IgniteTxEx<K, V> tx) { + return addRolledbackTx(tx.xidVersion()); + } + + /** + * @param xidVer Completed transaction version. + * @param nearXidVer Optional near transaction ID. + * @return If transaction was not already present in completed set. + */ + public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) { + assert !rolledbackVers.contains(xidVer) : "Version was rolled back: " + xidVer; + + if (nearXidVer != null) + xidVer = new CommittedVersion(xidVer, nearXidVer); + + if (committedVers.add(xidVer)) { + if (log.isDebugEnabled()) + log.debug("Added transaction to committed version set: " + xidVer); + + return true; + } + else { + if (log.isDebugEnabled()) + log.debug("Transaction is already present in committed version set: " + xidVer); + + return false; + } + } + + /** + * @param xidVer Completed transaction version. + * @return If transaction was not already present in completed set. + */ + public boolean addRolledbackTx(GridCacheVersion xidVer) { + assert !committedVers.contains(xidVer); + + if (rolledbackVers.add(xidVer)) { + if (log.isDebugEnabled()) + log.debug("Added transaction to rolled back version set: " + xidVer); + + return true; + } + else { + if (log.isDebugEnabled()) + log.debug("Transaction is already present in rolled back version set: " + xidVer); + + return false; + } + } + + /** + * @param tx Transaction. + */ + private void processCompletedEntries(IgniteTxEx<K, V> tx) { + if (tx.needsCompletedVersions()) { + GridCacheVersion min = minVersion(tx.readEntries(), tx.xidVersion(), tx); + + min = minVersion(tx.writeEntries(), min, tx); + + assert min != null; + + tx.completedVersions(min, committedVersions(min), rolledbackVersions(min)); + } + } + + /** + * Collects versions for all pending locks for all entries within transaction + * + * @param dhtTxLoc Transaction being committed. + */ + private void collectPendingVersions(GridDhtTxLocal<K, V> dhtTxLoc) { + if (dhtTxLoc.needsCompletedVersions()) { + if (log.isDebugEnabled()) + log.debug("Checking for pending locks with version less then tx version: " + dhtTxLoc); + + Set<GridCacheVersion> vers = new LinkedHashSet<>(); + + collectPendingVersions(dhtTxLoc.readEntries(), dhtTxLoc.xidVersion(), vers); + collectPendingVersions(dhtTxLoc.writeEntries(), dhtTxLoc.xidVersion(), vers); + + if (!vers.isEmpty()) + dhtTxLoc.pendingVersions(vers); + } + } + + /** + * Gets versions of all not acquired locks for collection of tx entries that are less then base version. + * + * @param entries Tx entries to process. + * @param baseVer Base version to compare with. + * @param vers Collection of versions that will be populated. + */ + @SuppressWarnings("TypeMayBeWeakened") + private void collectPendingVersions(Iterable<IgniteTxEntry<K, V>> entries, + GridCacheVersion baseVer, Set<GridCacheVersion> vers) { + + // The locks are not released yet, so we can safely list pending candidates versions. + for (IgniteTxEntry<K, V> txEntry : entries) { + GridCacheEntryEx<K, V> cached = txEntry.cached(); + + try { + // If check should be faster then exception handling. + if (!cached.obsolete()) { + for (GridCacheMvccCandidate<K> cand : cached.localCandidates()) { + if (!cand.owner() && cand.version().compareTo(baseVer) < 0) { + if (log.isDebugEnabled()) + log.debug("Adding candidate version to pending set: " + cand); + + vers.add(cand.version()); + } + } + } + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("There are no pending locks for entry (entry was deleted in transaction): " + txEntry); + } + } + } + + /** + * Go through all candidates for entries involved in transaction and find their min + * version. We know that these candidates will commit after this transaction, and + * therefore we can grab the min version so we can send all committed and rolled + * back versions from min to current to remote nodes for re-ordering. + * + * @param entries Entries. + * @param min Min version so far. + * @param tx Transaction. + * @return Minimal available version. + */ + private GridCacheVersion minVersion(Iterable<IgniteTxEntry<K, V>> entries, GridCacheVersion min, + IgniteTxEx<K, V> tx) { + for (IgniteTxEntry<K, V> txEntry : entries) { + GridCacheEntryEx<K, V> cached = txEntry.cached(); + + // We are assuming that this method is only called on commit. In that + // case, if lock is held, entry can never be removed. + assert txEntry.isRead() || !cached.obsolete(tx.xidVersion()) : + "Invalid obsolete version for transaction [entry=" + cached + ", tx=" + tx + ']'; + + for (GridCacheMvccCandidate<K> cand : cached.remoteMvccSnapshot()) + if (min == null || cand.version().isLess(min)) + min = cand.version(); + } + + return min; + } + + /** + * Commits a transaction. + * + * @param tx Transaction to commit. + */ + public void commitTx(IgniteTxEx<K, V> tx) { + assert tx != null; + assert tx.state() == COMMITTING : "Invalid transaction state for commit from tm [state=" + tx.state() + + ", expected=COMMITTING, tx=" + tx + ']'; + + if (log.isDebugEnabled()) + log.debug("Committing from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); + + if (tx.timeout() > 0) { + cctx.time().removeTimeoutObject(tx); + + if (log.isDebugEnabled()) + log.debug("Unregistered transaction with timeout processor: " + tx); + } + + /* + * Note that write phase is handled by transaction adapter itself, + * so we don't do it here. + */ + + // 1. Make sure that committed version has been recorded. + if (!(committedVers.contains(tx.xidVersion()) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { + uncommitTx(tx); + + throw new IgniteException("Missing commit version (consider increasing " + + GG_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" + + committedVers.firstx() + ", lastVer=" + committedVers.lastx() + ", tx=" + tx.xid() + ']'); + } + + ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap = transactionMap(tx); + + if (txIdMap.remove(tx.xidVersion(), tx)) { + // 2. Must process completed entries before unlocking! + processCompletedEntries(tx); + + if (tx instanceof GridDhtTxLocal) { + GridDhtTxLocal<K, V> dhtTxLoc = (GridDhtTxLocal<K, V>)tx; + + collectPendingVersions(dhtTxLoc); + } + + // 3.1 Call dataStructures manager. + for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts()) + cacheCtx.dataStructures().onTxCommitted(tx); + + // 3.2 Add to pessimistic commit buffer if needed. + addPessimisticRecovery(tx); + + // 4. Unlock write resources. + if (tx.groupLock()) + unlockGroupLocks(tx); + else + unlockMultiple(tx, tx.writeEntries()); + + // 5. For pessimistic transaction, unlock read resources if required. + if (tx.pessimistic() && !tx.readCommitted() && !tx.groupLock()) + unlockMultiple(tx, tx.readEntries()); + + // 6. Notify evictions. + notifyEvitions(tx); + + // 7. Remove obsolete entries from cache. + removeObsolete(tx); + + // 8. Assign transaction number at the end of transaction. + tx.endVersion(cctx.versions().next(tx.topologyVersion())); + + // 9. Clean start transaction number for this transaction. + if (cctx.txConfig().isTxSerializableEnabled()) + decrementStartVersionCount(tx); + + // 10. Add to committed queue only if it is possible + // that this transaction can affect other ones. + if (cctx.txConfig().isTxSerializableEnabled() && tx.enforceSerializable() && !isSafeToForget(tx)) + committedQ.add(tx); + + // 11. Remove from per-thread storage. + if (tx.local() && !tx.dht()) + threadMap.remove(tx.threadId(), tx); + + // 12. Unregister explicit locks. + if (!tx.alternateVersions().isEmpty()) { + for (GridCacheVersion ver : tx.alternateVersions()) + idMap.remove(ver); + } + + // 13. Remove Near-2-DHT mappings. + if (tx instanceof GridCacheMappedVersion) { + GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion(); + + if (mapped != null) + mappedVers.remove(mapped); + } + + // 14. Clear context. + txContextReset(); + + // 15. Update metrics. + if (!tx.dht() && tx.local()) { + cctx.txMetrics().onTxCommit(); + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + cacheCtx.cache().metrics0().onTxCommit(); + } + } + + if (slowTxWarnTimeout > 0 && tx.local() && + U.currentTimeMillis() - tx.startTime() > slowTxWarnTimeout) + U.warn(log, "Slow transaction detected [tx=" + tx + + ", slowTxWarnTimeout=" + slowTxWarnTimeout + ']') ; + + if (log.isDebugEnabled()) + log.debug("Committed from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); + } + else if (log.isDebugEnabled()) + log.debug("Did not commit from TM (was already committed): " + tx); + } + + /** + * Rolls back a transaction. + * + * @param tx Transaction to rollback. + */ + public void rollbackTx(IgniteTxEx<K, V> tx) { + assert tx != null; + + if (log.isDebugEnabled()) + log.debug("Rolling back from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); + + // 1. Record transaction version to avoid duplicates. + addRolledbackTx(tx); + + ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap = transactionMap(tx); + + if (txIdMap.remove(tx.xidVersion(), tx)) { + // 2. Unlock write resources. + unlockMultiple(tx, tx.writeEntries()); + + // 3. For pessimistic transaction, unlock read resources if required. + if (tx.pessimistic() && !tx.readCommitted()) + unlockMultiple(tx, tx.readEntries()); + + // 4. Notify evictions. + notifyEvitions(tx); + + // 5. Remove obsolete entries. + removeObsolete(tx); + + // 6. Clean start transaction number for this transaction. + if (cctx.txConfig().isTxSerializableEnabled()) + decrementStartVersionCount(tx); + + // 7. Remove from per-thread storage. + if (tx.local() && !tx.dht()) + threadMap.remove(tx.threadId(), tx); + + // 8. Unregister explicit locks. + if (!tx.alternateVersions().isEmpty()) + for (GridCacheVersion ver : tx.alternateVersions()) + idMap.remove(ver); + + // 9. Remove Near-2-DHT mappings. + if (tx instanceof GridCacheMappedVersion) + mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); + + // 10. Clear context. + txContextReset(); + + // 11. Update metrics. + if (!tx.dht() && tx.local()) { + cctx.txMetrics().onTxRollback(); + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + + cacheCtx.cache().metrics0().onTxRollback(); + } + } + + if (log.isDebugEnabled()) + log.debug("Rolled back from TM: " + tx); + } + else if (log.isDebugEnabled()) + log.debug("Did not rollback from TM (was already rolled back): " + tx); + } + + /** + * Tries to minimize damage from partially-committed transaction. + * + * @param tx Tx to uncommit. + */ + public void uncommitTx(IgniteTxEx<K, V> tx) { + assert tx != null; + + if (log.isDebugEnabled()) + log.debug("Uncommiting from TM: " + tx); + + ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap = transactionMap(tx); + + if (txIdMap.remove(tx.xidVersion(), tx)) { + // 1. Unlock write resources. + unlockMultiple(tx, tx.writeEntries()); + + // 2. For pessimistic transaction, unlock read resources if required. + if (tx.pessimistic() && !tx.readCommitted()) + unlockMultiple(tx, tx.readEntries()); + + // 3. Notify evictions. + notifyEvitions(tx); + + // 4. Clean start transaction number for this transaction. + if (cctx.txConfig().isTxSerializableEnabled()) + decrementStartVersionCount(tx); + + // 5. Remove from per-thread storage. + if (tx.local() && !tx.dht()) + threadMap.remove(tx.threadId(), tx); + + // 6. Unregister explicit locks. + if (!tx.alternateVersions().isEmpty()) + for (GridCacheVersion ver : tx.alternateVersions()) + idMap.remove(ver); + + // 7. Remove Near-2-DHT mappings. + if (tx instanceof GridCacheMappedVersion) + mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); + + // 8. Clear context. + txContextReset(); + + if (log.isDebugEnabled()) + log.debug("Uncommitted from TM: " + tx); + } + else if (log.isDebugEnabled()) + log.debug("Did not uncommit from TM (was already committed or rolled back): " + tx); + } + + /** + * Gets transaction ID map depending on transaction type. + * + * @param tx Transaction. + * @return Transaction map. + */ + private ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> transactionMap(IgniteTxEx<K, V> tx) { + return (tx.near() && !tx.local()) ? nearIdMap : idMap; + } + + /** + * @param tx Transaction to notify evictions for. + */ + private void notifyEvitions(IgniteTxEx<K, V> tx) { + if (tx.internal() && !tx.groupLock()) + return; + + for (IgniteTxEntry<K, V> txEntry : tx.allEntries()) + txEntry.cached().context().evicts().touch(txEntry, tx.local()); + } + + /** + * Callback invoked whenever a member of a transaction acquires + * lock ownership. + * + * @param entry Cache entry. + * @param owner Candidate that won ownership. + * @return {@code True} if transaction was notified, {@code false} otherwise. + */ + public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) { + // We only care about acquired locks. + if (owner != null) { + IgniteTxAdapter<K, V> tx = tx(owner.version()); + + if (tx == null) + tx = nearTx(owner.version()); + + if (tx != null) { + if (!tx.local()) { + if (log.isDebugEnabled()) + log.debug("Found transaction for owner changed event [owner=" + owner + ", entry=" + entry + + ", tx=" + tx + ']'); + + tx.onOwnerChanged(entry, owner); + + return true; + } + else if (log.isDebugEnabled()) + log.debug("Ignoring local transaction for owner change event: " + tx); + } + else if (log.isDebugEnabled()) + log.debug("Transaction not found for owner changed event [owner=" + owner + ", entry=" + entry + ']'); + } + + return false; + } + + /** + * Callback called by near finish future before sending near finish request to remote node. Will increment + * per-thread counter so that further awaitAck call will wait for finish response. + * + * @param rmtNodeId Remote node ID for which finish request is being sent. + * @param threadId Near tx thread ID. + */ + public void beforeFinishRemote(UUID rmtNodeId, long threadId) { + if (finishSyncDisabled) + return; + + assert txFinishSync != null; + + txFinishSync.onFinishSend(rmtNodeId, threadId); + } + + /** + * Callback invoked when near finish response is received from remote node. + * + * @param rmtNodeId Remote node ID from which response is received. + * @param threadId Near tx thread ID. + */ + public void onFinishedRemote(UUID rmtNodeId, long threadId) { + if (finishSyncDisabled) + return; + + assert txFinishSync != null; + + txFinishSync.onAckReceived(rmtNodeId, threadId); + } + + /** + * Asynchronously waits for last finish request ack. + * + * @param rmtNodeId Remote node ID. + * @param threadId Near tx thread ID. + * @return {@code null} if ack was received or future that will be completed when ack is received. + */ + @Nullable public IgniteFuture<?> awaitFinishAckAsync(UUID rmtNodeId, long threadId) { + if (finishSyncDisabled) + return null; + + assert txFinishSync != null; + + return txFinishSync.awaitAckAsync(rmtNodeId, threadId); + } + + /** + * For test purposes only. + * + * @param finishSyncDisabled {@code True} if finish sync should be disabled. + */ + public void finishSyncDisabled(boolean finishSyncDisabled) { + this.finishSyncDisabled = finishSyncDisabled; + } + + /** + * @param tx Transaction. + * @param entries Entries to lock. + * @return {@code True} if all keys were locked. + * @throws IgniteCheckedException If lock has been cancelled. + */ + private boolean lockMultiple(IgniteTxEx<K, V> tx, Iterable<IgniteTxEntry<K, V>> entries) + throws IgniteCheckedException { + assert tx.optimistic(); + + long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout()); + + // For serializable transactions, failure to acquire lock means + // that there is a serializable conflict. For all other isolation levels, + // we wait for the lock. + long timeout = tx.timeout() == 0 ? 0 : remainingTime; + + for (IgniteTxEntry<K, V> txEntry1 : entries) { + // Check if this entry was prepared before. + if (!txEntry1.markPrepared()) + continue; + + GridCacheContext<K, V> cacheCtx = txEntry1.context(); + + while (true) { + try { + GridCacheEntryEx<K, V> entry1 = txEntry1.cached(); + + assert !entry1.detached() : "Expected non-detached entry for near transaction " + + "[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']'; + + if (!entry1.tmLock(tx, timeout)) { + // Unlock locks locked so far. + for (IgniteTxEntry<K, V> txEntry2 : entries) { + if (txEntry2 == txEntry1) + break; + + txEntry2.cached().txUnlock(tx); + } + + return false; + } + + entry1.unswap(); + + break; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in TM lockMultiple(..) method (will retry): " + txEntry1); + + try { + // Renew cache entry. + txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key()), txEntry1.keyBytes()); + } + catch (GridDhtInvalidPartitionException e) { + assert tx.dht() : "Received invalid partition for non DHT transaction [tx=" + + tx + ", invalidPart=" + e.partition() + ']'; + + // If partition is invalid, we ignore this entry. + tx.addInvalidPartition(cacheCtx, e.partition()); + + break; + } + } + catch (GridDistributedLockCancelledException ignore) { + tx.setRollbackOnly(); + + throw new IgniteCheckedException("Entry lock has been cancelled for transaction: " + tx); + } + } + } + + return true; + } + + /** + * Unlocks entries locked by group transaction. + * + * @param txx Transaction. + */ + @SuppressWarnings("unchecked") + private void unlockGroupLocks(IgniteTxEx txx) { + IgniteTxKey grpLockKey = txx.groupLockKey(); + + assert grpLockKey != null; + + if (grpLockKey == null) + return; + + IgniteTxEntry txEntry = txx.entry(grpLockKey); + + assert txEntry != null || (txx.near() && !txx.local()); + + if (txEntry != null) { + GridCacheContext cacheCtx = txEntry.context(); + + // Group-locked entries must be locked. + while (true) { + try { + GridCacheEntryEx<K, V> entry = txEntry.cached(); + + assert entry != null; + + entry.txUnlock(txx); + + break; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in TM unlockGroupLocks(..) method (will retry): " + txEntry); + + GridCacheAdapter cache = cacheCtx.cache(); + + // Renew cache entry. + txEntry.cached(cache.entryEx(txEntry.key()), txEntry.keyBytes()); + } + } + } + } + + /** + * @param tx Owning transaction. + * @param entries Entries to unlock. + */ + private void unlockMultiple(IgniteTxEx<K, V> tx, Iterable<IgniteTxEntry<K, V>> entries) { + for (IgniteTxEntry<K, V> txEntry : entries) { + GridCacheContext<K, V> cacheCtx = txEntry.context(); + + while (true) { + try { + GridCacheEntryEx<K, V> entry = txEntry.cached(); + + if (entry.detached()) + break; + + assert entry != null; + + entry.txUnlock(tx); + + break; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in TM unlockMultiple(..) method (will retry): " + txEntry); + + // Renew cache entry. + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), txEntry.keyBytes()); + } + } + } + } + + /** + * @param sync Transaction synchronizations to add. + */ + public void addSynchronizations(IgniteTxSynchronization... sync) { + if (F.isEmpty(sync)) + return; + + F.copy(syncs, sync); + } + + /** + * @param sync Transaction synchronizations to remove. + */ + public void removeSynchronizations(IgniteTxSynchronization... sync) { + if (F.isEmpty(sync)) + return; + + F.lose(syncs, false, Arrays.asList(sync)); + } + + /** + * @return Registered transaction synchronizations + */ + public Collection<IgniteTxSynchronization> synchronizations() { + return Collections.unmodifiableList(new LinkedList<>(syncs)); + } + + /** + * @param prevState Previous state. + * @param newState New state. + * @param tx Cache transaction. + */ + public void onTxStateChange(@Nullable IgniteTxState prevState, IgniteTxState newState, IgniteTx tx) { + // Notify synchronizations. + for (IgniteTxSynchronization s : syncs) + s.onStateChanged(prevState, newState, tx); + } + + /** + * @param tx Committing transaction. + */ + public void txContext(IgniteTxEx tx) { + threadCtx.set(tx); + } + + /** + * @return Currently committing transaction. + */ + @SuppressWarnings({"unchecked"}) + private IgniteTxEx<K, V> txContext() { + return threadCtx.get(); + } + + /** + * Gets version of transaction in tx context or {@code null} + * if tx context is empty. + * <p> + * This is a convenience method provided mostly for debugging. + * + * @return Transaction version from transaction context. + */ + @Nullable public GridCacheVersion txContextVersion() { + IgniteTxEx<K, V> tx = txContext(); + + return tx == null ? null : tx.xidVersion(); + } + + /** + * Commit ended. + */ + public void txContextReset() { + threadCtx.set(null); + } + + /** + * @return All transactions. + */ + public Collection<IgniteTxEx<K, V>> txs() { + return F.concat(false, idMap.values(), nearIdMap.values()); + } + + /** + * @return Slow tx warn timeout. + */ + public int slowTxWarnTimeout() { + return slowTxWarnTimeout; + } + + /** + * @param slowTxWarnTimeout Slow tx warn timeout. + */ + public void slowTxWarnTimeout(int slowTxWarnTimeout) { + this.slowTxWarnTimeout = slowTxWarnTimeout; + } + + /** + * Checks if transactions with given near version ID was prepared or committed. + * + * @param nearVer Near version ID. + * @param txNum Number of transactions. + * @return {@code True} if transactions were prepared or committed. + */ + public boolean txsPreparedOrCommitted(GridCacheVersion nearVer, int txNum) { + Collection<GridCacheVersion> processedVers = null; + + for (IgniteTxEx<K, V> tx : txs()) { + if (nearVer.equals(tx.nearXidVersion())) { + IgniteTxState state = tx.state(); + + if (state == PREPARED || state == COMMITTING || state == COMMITTED) { + if (--txNum == 0) + return true; + } + else { + if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) { + tx.rollbackAsync(); + + if (log.isDebugEnabled()) + log.debug("Transaction was not prepared (rolled back): " + tx); + + return false; + } + else { + if (tx.state() == COMMITTED) { + if (--txNum == 0) + return true; + } + else { + if (log.isDebugEnabled()) + log.debug("Transaction is not prepared: " + tx); + + return false; + } + } + } + + if (processedVers == null) + processedVers = new HashSet<>(txNum, 1.0f); + + processedVers.add(tx.xidVersion()); + } + } + + // Not all transactions were found. Need to scan committed versions to check + // if transaction was already committed. + for (GridCacheVersion ver : committedVers) { + if (processedVers != null && processedVers.contains(ver)) + continue; + + if (ver instanceof CommittedVersion) { + CommittedVersion commitVer = (CommittedVersion)ver; + + if (commitVer.nearVer.equals(nearVer)) { + if (--txNum == 0) + return true; + } + } + } + + return false; + } + + /** + * Adds transaction to pessimistic recovery buffer if needed. + * + * @param tx Committed transaction to add. + */ + private void addPessimisticRecovery(IgniteTxEx<K, V> tx) { + if (pessimisticRecoveryBuf == null) + return; + + // Do not store recovery information for optimistic or replicated local transactions. + if (tx.optimistic() || (tx.local() && tx.replicated())) + return; + + pessimisticRecoveryBuf.addCommittedTx(tx); + } + + /** + * Checks whether transaction with given near version was committed on this node and returns commit info. + * + * @param nearTxVer Near tx version. + * @param originatingNodeId Originating node ID. + * @param originatingThreadId Originating thread ID. + * @return Commit info, if present. + */ + @Nullable public GridCacheCommittedTxInfo<K, V> txCommitted(GridCacheVersion nearTxVer, + UUID originatingNodeId, long originatingThreadId) { + assert pessimisticRecoveryBuf != null : "Should not be called for LOCAL cache."; + + return pessimisticRecoveryBuf.committedTx(nearTxVer, originatingNodeId, originatingThreadId); + } + + /** + * Gets local transaction for pessimistic tx recovery. + * + * @param nearXidVer Near tx ID. + * @return Near local or colocated local transaction. + */ + @Nullable public IgniteTxEx<K, V> localTxForRecovery(GridCacheVersion nearXidVer, boolean markFinalizing) { + // First check if we have near transaction with this ID. + IgniteTxEx<K, V> tx = idMap.get(nearXidVer); + + if (tx == null) { + // Check all local transactions and mark them as waiting for recovery to prevent finish race. + for (IgniteTxEx<K, V> txEx : idMap.values()) { + if (nearXidVer.equals(txEx.nearXidVersion())) { + if (!markFinalizing || !txEx.markFinalizing(RECOVERY_WAIT)) + tx = txEx; + } + } + } + + // Either we found near transaction or one of transactions is being committed by user. + // Wait for it and send reply. + if (tx != null && tx.local()) + return tx; + + return null; + } + + /** + * Commits or rolls back prepared transaction. + * + * @param tx Transaction. + * @param commit Whether transaction should be committed or rolled back. + */ + public void finishOptimisticTxOnRecovery(final IgniteTxEx<K, V> tx, boolean commit) { + if (log.isDebugEnabled()) + log.debug("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']'); + + if (!tx.markFinalizing(RECOVERY_FINISH)) { + if (log.isDebugEnabled()) + log.debug("Will not try to commit prepared transaction (could not mark finalized): " + tx); + + return; + } + + if (tx instanceof GridDistributedTxRemoteAdapter) { + IgniteTxRemoteEx<K,V> rmtTx = (IgniteTxRemoteEx<K, V>)tx; + + rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList()); + } + + if (commit) + tx.commitAsync().listenAsync(new CommitListener(tx)); + else + tx.rollbackAsync(); + } + + /** + * Commits or rolls back pessimistic transaction. + * + * @param tx Transaction to finish. + * @param commitInfo Commit information. + */ + public void finishPessimisticTxOnRecovery(final IgniteTxEx<K, V> tx, GridCacheCommittedTxInfo<K, V> commitInfo) { + if (!tx.markFinalizing(RECOVERY_FINISH)) { + if (log.isDebugEnabled()) + log.debug("Will not try to finish pessimistic transaction (could not mark as finalizing): " + tx); + + return; + } + + if (tx instanceof GridDistributedTxRemoteAdapter) { + IgniteTxRemoteEx<K,V> rmtTx = (IgniteTxRemoteEx<K, V>)tx; + + rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList()); + } + + try { + tx.prepare(); + + if (commitInfo != null) { + for (IgniteTxEntry<K, V> entry : commitInfo.recoveryWrites()) { + IgniteTxEntry<K, V> write = tx.writeMap().get(entry.txKey()); + + if (write != null) { + GridCacheEntryEx<K, V> cached = write.cached(); + + IgniteTxEntry<K, V> recovered = entry.cleanCopy(write.context()); + + if (cached == null || cached.detached()) + cached = write.context().cache().entryEx(entry.key(), tx.topologyVersion()); + + recovered.cached(cached, cached.keyBytes()); + + tx.writeMap().put(entry.txKey(), recovered); + + continue; + } + + ((IgniteTxAdapter<K, V>)tx).recoveryWrites(commitInfo.recoveryWrites()); + + // If write was not found, check read. + IgniteTxEntry<K, V> read = tx.readMap().remove(entry.txKey()); + + if (read != null) + tx.writeMap().put(entry.txKey(), entry); + } + + tx.commitAsync().listenAsync(new CommitListener(tx)); + } + else + tx.rollbackAsync(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to prepare pessimistic transaction (will invalidate): " + tx, e); + + salvageTx(tx); + } + } + + /** + * @param req Check committed request. + * @return Check committed future. + */ + public IgniteFuture<GridCacheCommittedTxInfo<K, V>> checkPessimisticTxCommitted(GridCachePessimisticCheckCommittedTxRequest req) { + // First check if we have near transaction with this ID. + IgniteTxEx<K, V> tx = localTxForRecovery(req.nearXidVersion(), !req.nearOnlyCheck()); + + // Either we found near transaction or one of transactions is being committed by user. + // Wait for it and send reply. + if (tx != null) { + assert tx.local(); + + if (log.isDebugEnabled()) + log.debug("Found active near transaction, will wait for completion [req=" + req + ", tx=" + tx + ']'); + + final IgniteTxEx<K, V> tx0 = tx; + + return tx.finishFuture().chain(new C1<IgniteFuture<IgniteTx>, GridCacheCommittedTxInfo<K, V>>() { + @Override public GridCacheCommittedTxInfo<K, V> apply(IgniteFuture<IgniteTx> txFut) { + GridCacheCommittedTxInfo<K, V> info = null; + + if (tx0.state() == COMMITTED) + info = new GridCacheCommittedTxInfo<>(tx0); + + return info; + } + }); + } + + GridCacheCommittedTxInfo<K, V> info = txCommitted(req.nearXidVersion(), req.originatingNodeId(), + req.originatingThreadId()); + + if (info == null) + info = txCommitted(req.nearXidVersion(), req.originatingNodeId(), req.originatingThreadId()); + + return new GridFinishedFutureEx<>(info); + } + + /** + * Timeout object for node failure handler. + */ + private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter { + /** Left or failed node. */ + private final UUID evtNodeId; + + /** + * @param evtNodeId Event node ID. + */ + private NodeFailureTimeoutObject(UUID evtNodeId) { + super(IgniteUuid.fromUuid(cctx.localNodeId()), TX_SALVAGE_TIMEOUT); + + this.evtNodeId = evtNodeId; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + try { + cctx.kernalContext().gateway().readLock(); + } + catch (IllegalStateException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to acquire kernal gateway (grid is stopping)."); + + return; + } + + try { + if (log.isDebugEnabled()) + log.debug("Processing node failed event [locNodeId=" + cctx.localNodeId() + + ", failedNodeId=" + evtNodeId + ']'); + + for (IgniteTxEx<K, V> tx : txs()) { + if ((tx.near() && !tx.local()) || (tx.storeUsed() && tx.masterNodeIds().contains(evtNodeId))) { + // Invalidate transactions. + salvageTx(tx, false, RECOVERY_FINISH); + } + else if (tx.optimistic()) { + // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx. + if (tx.originatingNodeId().equals(evtNodeId)) { + if (tx.state() == PREPARED) + commitIfPrepared(tx); + else { + if (tx.setRollbackOnly()) + tx.rollbackAsync(); + // If we could not mark tx as rollback, it means that transaction is being committed. + } + } + } + else { + // Pessimistic. + if (tx.originatingNodeId().equals(evtNodeId)) { + if (tx.state() != COMMITTING && tx.state() != COMMITTED) + commitIfRemotelyCommitted(tx); + else { + if (log.isDebugEnabled()) + log.debug("Skipping pessimistic transaction check (transaction is being committed) " + + "[tx=" + tx + ", locNodeId=" + cctx.localNodeId() + ']'); + } + } + else { + if (log.isDebugEnabled()) + log.debug("Skipping pessimistic transaction check [tx=" + tx + + ", evtNodeId=" + evtNodeId + ", locNodeId=" + cctx.localNodeId() + ']'); + } + } + } + } + finally { + cctx.kernalContext().gateway().readUnlock(); + } + } + + /** + * Commits optimistic transaction in case when node started transaction failed, but all related + * transactions were prepared (invalidates transaction if it is not fully prepared). + * + * @param tx Transaction. + */ + private void commitIfPrepared(IgniteTxEx<K, V> tx) { + assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx; + assert !F.isEmpty(tx.transactionNodes()); + assert tx.nearXidVersion() != null; + + + GridCacheOptimisticCheckPreparedTxFuture<K, V> fut = new GridCacheOptimisticCheckPreparedTxFuture<>( + cctx, tx, evtNodeId, tx.transactionNodes()); + + cctx.mvcc().addFuture(fut); + + if (log.isDebugEnabled()) + log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']'); + + fut.prepare(); + } + + /** + * Commits pessimistic transaction if at least one of remote nodes has committed this transaction. + * + * @param tx Transaction. + */ + private void commitIfRemotelyCommitted(IgniteTxEx<K, V> tx) { + assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx; + + GridCachePessimisticCheckCommittedTxFuture<K, V> fut = new GridCachePessimisticCheckCommittedTxFuture<>( + cctx, tx, evtNodeId); + + cctx.mvcc().addFuture(fut); + + if (log.isDebugEnabled()) + log.debug("Checking pessimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']'); + + fut.prepare(); + } + } + + /** + * + */ + private static class CommittedVersion extends GridCacheVersion { + /** */ + private static final long serialVersionUID = 0L; + + /** Corresponding near version. Transient. */ + private GridCacheVersion nearVer; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public CommittedVersion() { + // No-op. + } + + /** + * @param ver Committed version. + * @param nearVer Near transaction version. + */ + private CommittedVersion(GridCacheVersion ver, GridCacheVersion nearVer) { + super(ver.topologyVersion(), ver.globalTime(), ver.order(), ver.nodeOrder(), ver.dataCenterId()); + + assert nearVer != null; + + this.nearVer = nearVer; + } + } + + /** + * Atomic integer that compares only using references, not values. + */ + private static final class AtomicInt extends AtomicInteger { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param initVal Initial value. + */ + private AtomicInt(int initVal) { + super(initVal); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + // Reference only. + return obj == this; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return super.hashCode(); + } + } + + /** + * Commit listener. Checks if commit succeeded and rollbacks if case of error. + */ + private class CommitListener implements CI1<IgniteFuture<IgniteTx>> { + /** */ + private static final long serialVersionUID = 0L; + + /** Transaction. */ + private final IgniteTxEx<K, V> tx; + + /** + * @param tx Transaction. + */ + private CommitListener(IgniteTxEx<K, V> tx) { + this.tx = tx; + } + + /** {@inheritDoc} */ + @Override public void apply(IgniteFuture<IgniteTx> t) { + try { + t.get(); + } + catch (IgniteTxOptimisticException ignore) { + if (log.isDebugEnabled()) + log.debug("Optimistic failure while committing prepared transaction (will rollback): " + + tx); + + tx.rollbackAsync(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to commit transaction during failover: " + tx, e); + } + } + } +}