http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 0000000,b6db5b1..fae1989
mode 000000,100644..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@@ -1,0 -1,2221 +1,2222 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.transactions;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.distributed.near.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+
+ import static org.apache.ignite.IgniteSystemProperties.*;
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+ import static org.apache.ignite.transactions.IgniteTxState.*;
+ import static
org.apache.ignite.internal.processors.cache.transactions.IgniteTxEx.FinalizationStatus.*;
+ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
+ import static org.apache.ignite.internal.util.GridConcurrentFactory.*;
+
+ /**
+ * Cache transaction manager.
+ */
+ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K,
V> {
+ /** Default maximum number of transactions that have completed. */
+ private static final int DFLT_MAX_COMPLETED_TX_CNT = 262144; // 2^18
+
+ /** Slow tx warn timeout (initialized to 0). */
+ private static final int SLOW_TX_WARN_TIMEOUT =
Integer.getInteger(GG_SLOW_TX_WARN_TIMEOUT, 0);
+
+ /** Tx salvage timeout (default 3s). */
+ private static final int TX_SALVAGE_TIMEOUT =
Integer.getInteger(GG_TX_SALVAGE_TIMEOUT, 100);
+
+ /** Committing transactions. */
+ private final ThreadLocal<IgniteTxEx> threadCtx = new
GridThreadLocalEx<>();
+
+ /** Per-thread transaction map. */
+ private final ConcurrentMap<Long, IgniteTxEx<K, V>> threadMap = newMap();
+
+ /** Per-ID map. */
+ private final ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> idMap =
newMap();
+
+ /** Per-ID map for near transactions. */
+ private final ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> nearIdMap
= newMap();
+
+ /** TX handler. */
+ private IgniteTxHandler<K, V> txHandler;
+
+ /** All transactions. */
+ private final Queue<IgniteTxEx<K, V>> committedQ = new
ConcurrentLinkedDeque8<>();
+
+ /** Preparing transactions. */
+ private final Queue<IgniteTxEx<K, V>> prepareQ = new
ConcurrentLinkedDeque8<>();
+
+ /** Minimum start version. */
+ private final ConcurrentNavigableMap<GridCacheVersion, AtomicInt>
startVerCnts =
+ new ConcurrentSkipListMap<>();
+
+ /** Committed local transactions. */
+ private final GridBoundedConcurrentOrderedSet<GridCacheVersion>
committedVers =
+ new
GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT,
DFLT_MAX_COMPLETED_TX_CNT));
+
+ /** Rolled back local transactions. */
+ private final NavigableSet<GridCacheVersion> rolledbackVers =
+ new
GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT,
DFLT_MAX_COMPLETED_TX_CNT));
+
+ /** Pessimistic commit buffer. */
+ private GridCacheTxCommitBuffer<K, V> pessimisticRecoveryBuf;
+
+ /** Transaction synchronizations. */
+ private final Collection<IgniteTxSynchronization> syncs =
+ new GridConcurrentHashSet<>();
+
+ /** Transaction finish synchronizer. */
+ private GridCacheTxFinishSync<K, V> txFinishSync;
+
+ /** For test purposes only. */
+ private boolean finishSyncDisabled;
+
+ /** Slow tx warn timeout. */
+ private int slowTxWarnTimeout = SLOW_TX_WARN_TIMEOUT;
+
+ /**
+ * Near version to DHT version map. Note that we initialize to 5K size
from get go,
+ * to avoid future map resizings.
+ */
+ private final ConcurrentMap<GridCacheVersion, GridCacheVersion>
mappedVers =
+ new ConcurrentHashMap8<>(5120);
+
+ /** {@inheritDoc} */
+ @Override protected void onKernalStart0() {
+ cctx.gridEvents().addLocalEventListener(
+ new GridLocalEventListener() {
+ @Override public void onEvent(IgniteEvent evt) {
+ assert evt instanceof IgniteDiscoveryEvent;
+ assert evt.type() == EVT_NODE_FAILED || evt.type() ==
EVT_NODE_LEFT;
+
+ IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+ cctx.time().addTimeoutObject(new
NodeFailureTimeoutObject(discoEvt.eventNode().id()));
+
+ if (txFinishSync != null)
+ txFinishSync.onNodeLeft(discoEvt.eventNode().id());
+ }
+ },
+ EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+ for (IgniteTxEx<K, V> tx : idMap.values()) {
+ if ((!tx.local() || tx.dht()) &&
!cctx.discovery().aliveAll(tx.masterNodeIds())) {
+ if (log.isDebugEnabled())
+ log.debug("Remaining transaction from left node: " + tx);
+
+ salvageTx(tx, true, USER_FINISH);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void start0() throws IgniteCheckedException {
+ pessimisticRecoveryBuf = new GridCachePerThreadTxCommitBuffer<>(cctx);
+
+ txFinishSync = new GridCacheTxFinishSync<>(cctx);
+
+ txHandler = new IgniteTxHandler<>(cctx);
+ }
+
+ /**
+ * @return TX handler.
+ */
+ public IgniteTxHandler<K, V> txHandler() {
+ return txHandler;
+ }
+
+ /**
+ * Invalidates transaction.
+ *
+ * @param tx Transaction.
+ * @return {@code True} if transaction was salvaged by this call.
+ */
+ public boolean salvageTx(IgniteTxEx<K, V> tx) {
+ return salvageTx(tx, false, USER_FINISH);
+ }
+
+ /**
+ * Invalidates transaction.
+ *
+ * @param tx Transaction.
+ * @param warn {@code True} if warning should be logged.
+ * @param status Finalization status.
+ * @return {@code True} if transaction was salvaged by this call.
+ */
+ private boolean salvageTx(IgniteTxEx<K, V> tx, boolean warn,
IgniteTxEx.FinalizationStatus status) {
+ assert tx != null;
+
+ IgniteTxState state = tx.state();
+
+ if (state == ACTIVE || state == PREPARING || state == PREPARED) {
+ try {
+ if (!tx.markFinalizing(status)) {
+ if (log.isDebugEnabled())
+ log.debug("Will not try to commit invalidate
transaction (could not mark finalized): " + tx);
+
+ return false;
+ }
+
+ tx.systemInvalidate(true);
+
+ tx.prepare();
+
+ if (tx.state() == PREPARING) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring transaction in PREPARING state as
it is currently handled " +
+ "by another thread: " + tx);
+
+ return false;
+ }
+
+ if (tx instanceof IgniteTxRemoteEx) {
+ IgniteTxRemoteEx<K, V> rmtTx = (IgniteTxRemoteEx<K, V>)tx;
+
+ rmtTx.doneRemote(tx.xidVersion(),
Collections.<GridCacheVersion>emptyList(),
+ Collections.<GridCacheVersion>emptyList(),
Collections.<GridCacheVersion>emptyList());
+ }
+
+ tx.commit();
+
+ if (warn) {
+ // This print out cannot print any peer-deployed entity
either
+ // directly or indirectly.
+ U.warn(log, "Invalidated transaction because originating
node either " +
+ "crashed or left grid: " + CU.txString(tx));
+ }
+ }
+ catch (IgniteTxOptimisticException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Optimistic failure while invalidating
transaction (will rollback): " +
+ tx.xidVersion());
+
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to rollback transaction: " +
tx.xidVersion(), e);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to invalidate transaction: " + tx, e);
+ }
+ }
+ else if (state == MARKED_ROLLBACK) {
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to rollback transaction: " +
tx.xidVersion(), e);
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Prints out memory stats to standard out.
+ * <p>
+ * USE ONLY FOR MEMORY PROFILING DURING TESTS.
+ */
+ @Override public void printMemoryStats() {
+ IgniteTxEx<K, V> firstTx = committedQ.peek();
+
+ int committedSize = committedQ.size();
+
+ Map.Entry<GridCacheVersion, AtomicInt> startVerEntry =
startVerCnts.firstEntry();
+
+ GridCacheVersion minStartVer = null;
+ long dur = 0;
+
+ if (committedSize > 3000) {
+ minStartVer = new GridCacheVersion(Integer.MAX_VALUE,
Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, 0);
+
+ IgniteTxEx<K, V> stuck = null;
+
+ for (IgniteTxEx<K, V> tx : txs())
+ if (tx.startVersion().isLess(minStartVer)) {
+ minStartVer = tx.startVersion();
+ dur = U.currentTimeMillis() - tx.startTime();
+
+ stuck = tx;
+ }
+
+ X.println("Stuck transaction: " + stuck);
+ }
+
+ X.println(">>> ");
+ X.println(">>> Transaction manager memory stats [grid=" +
cctx.gridName() + ']');
+ X.println(">>> threadMapSize: " + threadMap.size());
+ X.println(">>> idMap [size=" + idMap.size() + ", minStartVer=" +
minStartVer + ", dur=" + dur + "ms]");
+ X.println(">>> committedQueue [size=" + committedSize +
+ ", firstStartVersion=" + (firstTx == null ? "null" :
firstTx.startVersion()) +
+ ", firstEndVersion=" + (firstTx == null ? "null" :
firstTx.endVersion()) + ']');
+ X.println(">>> prepareQueueSize: " + prepareQ.size());
+ X.println(">>> startVerCntsSize [size=" + startVerCnts.size() +
+ ", firstVer=" + startVerEntry + ']');
+ X.println(">>> committedVersSize: " + committedVers.size());
+ X.println(">>> rolledbackVersSize: " + rolledbackVers.size());
+
+ if (pessimisticRecoveryBuf != null)
+ X.println(">>> pessimsticCommitBufSize: " +
pessimisticRecoveryBuf.size());
+ }
+
+ /**
+ * @return Thread map size.
+ */
+ public int threadMapSize() {
+ return threadMap.size();
+ }
+
+ /**
+ * @return ID map size.
+ */
+ public int idMapSize() {
+ return idMap.size();
+ }
+
+ /**
+ * @return Committed queue size.
+ */
+ public int commitQueueSize() {
+ return committedQ.size();
+ }
+
+ /**
+ * @return Prepare queue size.
+ */
+ public int prepareQueueSize() {
+ return prepareQ.size();
+ }
+
+ /**
+ * @return Start version counts.
+ */
+ public int startVersionCountsSize() {
+ return startVerCnts.size();
+ }
+
+ /**
+ * @return Committed versions size.
+ */
+ public int committedVersionsSize() {
+ return committedVers.size();
+ }
+
+ /**
+ * @return Rolled back versions size.
+ */
+ public int rolledbackVersionsSize() {
+ return rolledbackVers.size();
+ }
+
+ /**
+ *
+ * @param tx Transaction to check.
+ * @return {@code True} if transaction has been committed or rolled back,
+ * {@code false} otherwise.
+ */
+ public boolean isCompleted(IgniteTxEx<K, V> tx) {
+ return committedVers.contains(tx.xidVersion()) ||
rolledbackVers.contains(tx.xidVersion());
+ }
+
+ /**
+ * @param implicit {@code True} if transaction is implicit.
+ * @param implicitSingle Implicit-with-single-key flag.
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @param timeout transaction timeout.
+ * @param txSize Expected transaction size.
+ * @param grpLockKey Group lock key if this is a group-lock transaction.
+ * @param partLock {@code True} if partition is locked.
+ * @return New transaction.
+ */
+ public IgniteTxLocalAdapter<K, V> newTx(
+ boolean implicit,
+ boolean implicitSingle,
+ boolean sys,
+ IgniteTxConcurrency concurrency,
+ IgniteTxIsolation isolation,
+ long timeout,
+ boolean invalidate,
+ boolean storeEnabled,
+ int txSize,
+ @Nullable IgniteTxKey grpLockKey,
+ boolean partLock) {
+ UUID subjId = null; // TODO GG-9141 how to get subj ID?
+
+ int taskNameHash = cctx.kernalContext().job().currentTaskNameHash();
+
+ GridNearTxLocal<K, V> tx = new GridNearTxLocal<>(
+ cctx,
+ implicit,
+ implicitSingle,
+ sys,
+ concurrency,
+ isolation,
+ timeout,
+ invalidate,
+ storeEnabled,
+ txSize,
+ grpLockKey,
+ partLock,
+ subjId,
+ taskNameHash);
+
+ return onCreated(tx);
+ }
+
+ /**
+ * @param tx Created transaction.
+ * @return Started transaction.
+ */
+ @Nullable public <T extends IgniteTxEx<K, V>> T onCreated(T tx) {
+ ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap =
transactionMap(tx);
+
+ // Start clean.
+ txContextReset();
+
+ if (isCompleted(tx)) {
+ if (log.isDebugEnabled())
+ log.debug("Attempt to create a completed transaction (will
ignore): " + tx);
+
+ return null;
+ }
+
+ IgniteTxEx<K, V> t;
+
+ if ((t = txIdMap.putIfAbsent(tx.xidVersion(), tx)) == null) {
+ // Add both, explicit and implicit transactions.
+ // Do not add remote and dht local transactions as remote node
may have the same thread ID
+ // and overwrite local transaction.
+ if (tx.local() && !tx.dht())
+ threadMap.put(tx.threadId(), tx);
+
+ // Handle mapped versions.
+ if (tx instanceof GridCacheMappedVersion) {
+ GridCacheMappedVersion mapped = (GridCacheMappedVersion)tx;
+
+ GridCacheVersion from = mapped.mappedVersion();
+
+ if (from != null)
+ mappedVers.put(from, tx.xidVersion());
+
+ if (log.isDebugEnabled())
+ log.debug("Added transaction version mapping [from=" +
from + ", to=" + tx.xidVersion() +
+ ", tx=" + tx + ']');
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Attempt to create an existing transaction (will
ignore) [newTx=" + tx + ", existingTx=" +
+ t + ']');
+
+ return null;
+ }
+
+ if (cctx.txConfig().isTxSerializableEnabled()) {
+ AtomicInt next = new AtomicInt(1);
+
+ boolean loop = true;
+
+ while (loop) {
+ AtomicInt prev = startVerCnts.putIfAbsent(tx.startVersion(),
next);
+
+ if (prev == null)
+ break; // Put succeeded - exit.
+
+ // Previous value was 0, which means that it will be deleted
+ // by another thread in "decrementStartVersionCount(..)"
method.
+ // In that case, we delete here too, so we can safely try
again.
+ for (;;) {
+ int p = prev.get();
+
+ assert p >= 0 : p;
+
+ if (p == 0) {
+ if (startVerCnts.remove(tx.startVersion(), prev))
+ if (log.isDebugEnabled())
+ log.debug("Removed count from onCreated
callback: " + tx);
+
+ break; // Retry outer loop.
+ }
+
+ if (prev.compareAndSet(p, p + 1)) {
+ loop = false; // Increment succeeded - exit outer
loop.
+
+ break;
+ }
+ }
+ }
+ }
+
+ if (tx.timeout() > 0) {
+ cctx.time().addTimeoutObject(tx);
+
+ if (log.isDebugEnabled())
+ log.debug("Registered transaction with timeout processor: " +
tx);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Transaction created: " + tx);
+
+ return tx;
+ }
+
+ /**
+ * Creates a future that will wait for all ongoing transactions that
maybe affected by topology update
+ * to be finished. This set of transactions include
+ * <ul>
+ * <li/> All {@link IgniteTxConcurrency#PESSIMISTIC} transactions
with topology version
+ * less or equal to {@code topVer}.
+ * <li/> {@link IgniteTxConcurrency#OPTIMISTIC} transactions in
PREPARING state with topology
+ * version less or equal to {@code topVer} and having transaction key
with entry that belongs to
+ * one of partitions in {@code parts}.
+ * </ul>
+ *
+ * @param topVer Topology version.
+ * @return Future that will be completed when all ongoing transactions
are finished.
+ */
+ public IgniteFuture<Boolean> finishTxs(long topVer) {
+ GridCompoundFuture<IgniteTx, Boolean> res =
+ new GridCompoundFuture<>(context().kernalContext(),
+ new IgniteReducer<IgniteTx, Boolean>() {
+ @Override public boolean collect(IgniteTx e) {
+ return true;
+ }
+
+ @Override public Boolean reduce() {
+ return true;
+ }
+ });
+
+ for (IgniteTxEx<K, V> tx : txs()) {
+ // Must wait for all transactions, even for DHT local and DHT
remote since preloading may acquire
+ // values pending to be overwritten by prepared transaction.
+
+ if (tx.concurrency() == PESSIMISTIC) {
+ if (tx.topologyVersion() > 0 && tx.topologyVersion() < topVer)
+ // For PESSIMISTIC mode we must wait for all uncompleted
txs
+ // as we do not know in advance which keys will
participate in tx.
+ res.add(tx.finishFuture());
+ }
+ else if (tx.concurrency() == OPTIMISTIC) {
+ // For OPTIMISTIC mode we wait only for txs in PREPARING
state that
+ // have keys for given partitions.
+ IgniteTxState state = tx.state();
+ long txTopVer = tx.topologyVersion();
+
+ if ((state == PREPARING || state == PREPARED || state ==
COMMITTING)
+ && txTopVer > 0 && txTopVer < topVer) {
+ res.add(tx.finishFuture());
+ }
+ }
+ }
+
+ res.markInitialized();
+
+ return res;
+ }
+
+ /**
+ * Transaction start callback (has to do with when any operation was
+ * performed on this transaction).
+ *
+ * @param tx Started transaction.
+ * @return {@code True} if transaction is not in completed set.
+ */
+ public boolean onStarted(IgniteTxEx<K, V> tx) {
+ assert tx.state() == ACTIVE || tx.isRollbackOnly() : "Invalid
transaction state [locId=" + cctx.localNodeId() +
+ ", tx=" + tx + ']';
+
+ if (isCompleted(tx)) {
+ if (log.isDebugEnabled())
+ log.debug("Attempt to start a completed transaction (will
ignore): " + tx);
+
+ return false;
+ }
+
+ onTxStateChange(null, ACTIVE, tx);
+
+ if (log.isDebugEnabled())
+ log.debug("Transaction started: " + tx);
+
+ return true;
+ }
+
+ /**
+ * Reverse mapped version look up.
+ *
+ * @param dhtVer Dht version.
+ * @return Near version.
+ */
+ @Nullable public GridCacheVersion nearVersion(GridCacheVersion dhtVer) {
+ IgniteTxEx<K, V> tx = idMap.get(dhtVer);
+
+ if (tx != null)
+ return tx.nearXidVersion();
+
+ return null;
+ }
+
+ /**
+ * @param from Near version.
+ * @return DHT version for a near version.
+ */
+ public GridCacheVersion mappedVersion(GridCacheVersion from) {
+ GridCacheVersion to = mappedVers.get(from);
+
+ if (log.isDebugEnabled())
+ log.debug("Found mapped version [from=" + from + ", to=" + to);
+
+ return to;
+ }
+
+ /**
+ *
+ * @param ver Alternate version.
+ * @param tx Transaction.
+ */
+ public void addAlternateVersion(GridCacheVersion ver, IgniteTxEx<K, V>
tx) {
+ if (idMap.putIfAbsent(ver, tx) == null)
+ if (log.isDebugEnabled())
+ log.debug("Registered alternate transaction version [ver=" +
ver + ", tx=" + tx + ']');
+ }
+
+ /**
+ * @return Local transaction.
+ */
+ @SuppressWarnings({"unchecked"})
+ @Nullable public <T> T localTx() {
+ IgniteTxEx<K, V> tx = tx();
+
+ return tx != null && tx.local() ? (T)tx : null;
+ }
+
+ /**
+ * @return Transaction for current thread.
+ */
+ @SuppressWarnings({"unchecked"})
+ public <T> T threadLocalTx() {
+ IgniteTxEx<K, V> tx = tx(Thread.currentThread().getId());
+
+ return tx != null && tx.local() && (!tx.dht() || tx.colocated()) &&
!tx.implicit() ? (T)tx : null;
+ }
+
+ /**
+ * @return Transaction for current thread.
+ */
+ @SuppressWarnings({"unchecked", "RedundantCast"})
+ public <T> T tx() {
+ IgniteTxEx<K, V> tx = txContext();
+
+ return tx != null ? (T)tx : (T)tx(Thread.currentThread().getId());
+ }
+
+ /**
+ * @return Local transaction.
+ */
+ @Nullable public IgniteTxEx<K, V> localTxx() {
+ IgniteTxEx<K, V> tx = txx();
+
+ return tx != null && tx.local() ? tx : null;
+ }
+
+ /**
+ * @return Transaction for current thread.
+ */
+ @SuppressWarnings({"unchecked"})
+ public IgniteTxEx<K, V> txx() {
+ return tx();
+ }
+
+ /**
+ * @return User transaction for current thread.
+ */
+ @Nullable public IgniteTx userTx() {
+ IgniteTxEx<K, V> tx = txContext();
+
+ if (tx != null && tx.user() && tx.state() == ACTIVE)
+ return tx;
+
+ tx = tx(Thread.currentThread().getId());
+
+ return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null;
+ }
+
+ /**
+ * @return User transaction.
+ */
+ @SuppressWarnings({"unchecked"})
+ @Nullable public <T extends IgniteTxLocalEx<K, V>> T userTxx() {
+ return (T)userTx();
+ }
+
+ /**
+ * @param threadId Id of thread for transaction.
+ * @return Transaction for thread with given ID.
+ */
+ @SuppressWarnings({"unchecked"})
+ public <T> T tx(long threadId) {
+ return (T)threadMap.get(threadId);
+ }
+
+ /**
+ * @return {@code True} if current thread is currently within transaction.
+ */
+ public boolean inUserTx() {
+ return userTx() != null;
+ }
+
+ /**
+ * @param txId Transaction ID.
+ * @return Transaction with given ID.
+ */
+ @SuppressWarnings({"unchecked"})
+ @Nullable public <T extends IgniteTxEx<K, V>> T tx(GridCacheVersion txId)
{
+ return (T)idMap.get(txId);
+ }
+
+ /**
+ * @param txId Transaction ID.
+ * @return Transaction with given ID.
+ */
+ @SuppressWarnings({"unchecked"})
+ @Nullable public <T extends IgniteTxEx<K, V>> T nearTx(GridCacheVersion
txId) {
+ return (T)nearIdMap.get(txId);
+ }
+
+ /**
+ * @param txId Transaction ID.
+ * @return Transaction with given ID.
+ */
+ @Nullable public IgniteTxEx<K, V> txx(GridCacheVersion txId) {
+ return idMap.get(txId);
+ }
+
+ /**
+ * Handles prepare stage of 2PC.
+ *
+ * @param tx Transaction to prepare.
+ * @throws IgniteCheckedException If preparation failed.
+ */
+ public void prepareTx(IgniteTxEx<K, V> tx) throws IgniteCheckedException {
+ if (tx.state() == MARKED_ROLLBACK) {
+ if (tx.timedOut())
+ throw new IgniteTxTimeoutException("Transaction timed out: "
+ this);
+
+ throw new IgniteCheckedException("Transaction is marked for
rollback: " + tx);
+ }
+
+ if (tx.remainingTime() == 0) {
+ tx.setRollbackOnly();
+
+ throw new IgniteTxTimeoutException("Transaction timed out: " +
this);
+ }
+
+ boolean txSerializableEnabled =
cctx.txConfig().isTxSerializableEnabled();
+
+ // Clean up committed transactions queue.
+ if (tx.pessimistic()) {
+ if (tx.enforceSerializable() && txSerializableEnabled) {
+ for (Iterator<IgniteTxEx<K, V>> it = committedQ.iterator();
it.hasNext();) {
+ IgniteTxEx<K, V> committedTx = it.next();
+
+ assert committedTx != tx;
+
+ // Clean up.
+ if (isSafeToForget(committedTx))
+ it.remove();
+ }
+ }
+
+ // Nothing else to do in pessimistic mode.
+ return;
+ }
+
+ if (txSerializableEnabled && tx.optimistic() &&
tx.enforceSerializable()) {
+ Set<IgniteTxKey<K>> readSet = tx.readSet();
+ Set<IgniteTxKey<K>> writeSet = tx.writeSet();
+
+ GridCacheVersion startTn = tx.startVersion();
+
+ GridCacheVersion finishTn = cctx.versions().last();
+
+ // Add future to prepare queue only on first prepare call.
+ if (tx.markPreparing())
+ prepareQ.offer(tx);
+
+ // Check that our read set does not intersect with write set
+ // of all transactions that completed their write phase
+ // while our transaction was in read phase.
+ for (Iterator<IgniteTxEx<K, V>> it = committedQ.iterator();
it.hasNext();) {
+ IgniteTxEx<K, V> committedTx = it.next();
+
+ assert committedTx != tx;
+
+ // Clean up.
+ if (isSafeToForget(committedTx)) {
+ it.remove();
+
+ continue;
+ }
+
+ GridCacheVersion tn = committedTx.endVersion();
+
+ // We only care about transactions
+ // with tn > startTn and tn <= finishTn
+ if (tn.compareTo(startTn) <= 0 || tn.compareTo(finishTn) > 0)
+ continue;
+
+ if (tx.serializable()) {
+ if (GridFunc.intersects(committedTx.writeSet(), readSet))
{
+ tx.setRollbackOnly();
+
+ throw new IgniteTxOptimisticException("Failed to
prepare transaction " +
+ "(committed vs. read-set conflict): " + tx);
+ }
+ }
+ }
+
+ // Check that our read and write sets do not intersect with write
+ // sets of all active transactions.
+ for (Iterator<IgniteTxEx<K, V>> iter = prepareQ.iterator();
iter.hasNext();) {
+ IgniteTxEx<K, V> prepareTx = iter.next();
+
+ if (prepareTx == tx)
+ // Skip yourself.
+ continue;
+
+ // Optimistically remove completed transactions.
+ if (prepareTx.done()) {
+ iter.remove();
+
+ if (log.isDebugEnabled())
+ log.debug("Removed finished transaction from active
queue: " + prepareTx);
+
+ continue;
+ }
+
+ // Check if originating node left.
+ if (cctx.discovery().node(prepareTx.nodeId()) == null) {
+ iter.remove();
+
+ rollbackTx(prepareTx);
+
+ if (log.isDebugEnabled())
+ log.debug("Removed and rolled back transaction
because sender node left grid: " +
+ CU.txString(prepareTx));
+
+ continue;
+ }
+
+ if (tx.serializable() && !prepareTx.isRollbackOnly()) {
+ Set<IgniteTxKey<K>> prepareWriteSet =
prepareTx.writeSet();
+
+ if (GridFunc.intersects(prepareWriteSet, readSet,
writeSet)) {
+ // Remove from active set.
+ iter.remove();
+
+ tx.setRollbackOnly();
+
+ throw new IgniteTxOptimisticException(
+ "Failed to prepare transaction
(read-set/write-set conflict): " + tx);
+ }
+ }
+ }
+ }
+
+ // Optimistic.
+ assert tx.optimistic();
+
+ if (!lockMultiple(tx, tx.optimisticLockEntries())) {
+ tx.setRollbackOnly();
+
+ throw new IgniteTxOptimisticException("Failed to prepare
transaction (lock conflict): " + tx);
+ }
+ }
+
+ /**
+ * @param tx Transaction to check.
+ * @return {@code True} if transaction can be discarded.
+ */
+ private boolean isSafeToForget(IgniteTxEx<K, V> tx) {
+ Map.Entry<GridCacheVersion, AtomicInt> e = startVerCnts.firstEntry();
+
+ if (e == null)
+ return true;
+
+ assert e.getValue().get() >= 0;
+
+ return tx.endVersion().compareTo(e.getKey()) <= 0;
+ }
+
+ /**
+ * Decrement start version count.
+ *
+ * @param tx Cache transaction.
+ */
+ private void decrementStartVersionCount(IgniteTxEx<K, V> tx) {
+ AtomicInt cnt = startVerCnts.get(tx.startVersion());
+
+ assert cnt != null : "Failed to find start version count for
transaction [startVerCnts=" + startVerCnts +
+ ", tx=" + tx + ']';
+
+ assert cnt.get() > 0;
+
+ if (cnt.decrementAndGet() == 0)
+ if (startVerCnts.remove(tx.startVersion(), cnt))
+ if (log.isDebugEnabled())
+ log.debug("Removed start version for transaction: " + tx);
+ }
+
+ /**
+ * @param tx Transaction.
+ */
+ private void removeObsolete(IgniteTxEx<K, V> tx) {
+ Collection<IgniteTxEntry<K, V>> entries = (tx.local() && !tx.dht()) ?
tx.allEntries() : tx.writeEntries();
+
+ for (IgniteTxEntry<K, V> entry : entries) {
+ GridCacheEntryEx<K, V> cached = entry.cached();
+
+ GridCacheContext<K, V> cacheCtx = entry.context();
+
+ if (cached == null)
+ cached = cacheCtx.cache().peekEx(entry.key());
+
+ if (cached.detached())
+ continue;
+
+ try {
+ if (cached.obsolete() ||
cached.markObsoleteIfEmpty(tx.xidVersion()))
+ cacheCtx.cache().removeEntry(cached);
+
+ if (!tx.near() && isNearEnabled(cacheCtx)) {
+ GridNearCacheAdapter<K, V> near = cacheCtx.isNear() ?
cacheCtx.near() : cacheCtx.dht().near();
+
+ GridNearCacheEntry<K, V> e = near.peekExx(entry.key());
+
+ if (e != null && e.markObsoleteIfEmpty(tx.xidVersion()))
+ near.removeEntry(e);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to remove obsolete entry from cache: " +
cached, e);
+ }
+ }
+ }
+
+ /**
+ * @param c Collection to copy.
+ * @return Copy of the collection.
+ */
+ private Collection<GridCacheVersion> copyOf(Iterable<GridCacheVersion> c)
{
+ Collection<GridCacheVersion> l = new LinkedList<>();
+
+ for (GridCacheVersion v : c)
+ l.add(v);
+
+ return l;
+ }
+
+ /**
+ * Gets committed transactions starting from the given version
(inclusive). // TODO: GG-4011: why inclusive?
+ *
+ * @param min Start (or minimum) version.
+ * @return Committed transactions starting from the given version
(non-inclusive).
+ */
+ public Collection<GridCacheVersion> committedVersions(GridCacheVersion
min) {
+ Set<GridCacheVersion> set = committedVers.tailSet(min, true);
+
+ return set == null || set.isEmpty() ?
Collections.<GridCacheVersion>emptyList() : copyOf(set);
+ }
+
+ /**
+ * Gets rolledback transactions starting from the given version
(inclusive). // TODO: GG-4011: why inclusive?
+ *
+ * @param min Start (or minimum) version.
+ * @return Committed transactions starting from the given version
(non-inclusive).
+ */
+ public Collection<GridCacheVersion> rolledbackVersions(GridCacheVersion
min) {
+ Set<GridCacheVersion> set = rolledbackVers.tailSet(min, true);
+
+ return set == null || set.isEmpty() ?
Collections.<GridCacheVersion>emptyList() : copyOf(set);
+ }
+
+ /**
+ * @param tx Tx to remove.
+ */
+ public void removeCommittedTx(IgniteTxEx<K, V> tx) {
+ committedVers.remove(tx.xidVersion());
+ }
+
+ /**
+ * @param tx Committed transaction.
+ * @return If transaction was not already present in committed set.
+ */
+ public boolean addCommittedTx(IgniteTxEx<K, V> tx) {
+ return addCommittedTx(tx.xidVersion(), tx.nearXidVersion());
+ }
+
+ /**
+ * @param tx Committed transaction.
+ * @return If transaction was not already present in committed set.
+ */
+ public boolean addRolledbackTx(IgniteTxEx<K, V> tx) {
+ return addRolledbackTx(tx.xidVersion());
+ }
+
+ /**
+ * @param xidVer Completed transaction version.
+ * @param nearXidVer Optional near transaction ID.
+ * @return If transaction was not already present in completed set.
+ */
+ public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable
GridCacheVersion nearXidVer) {
+ assert !rolledbackVers.contains(xidVer) : "Version was rolled back: "
+ xidVer;
+
+ if (nearXidVer != null)
+ xidVer = new CommittedVersion(xidVer, nearXidVer);
+
+ if (committedVers.add(xidVer)) {
+ if (log.isDebugEnabled())
+ log.debug("Added transaction to committed version set: " +
xidVer);
+
+ return true;
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Transaction is already present in committed
version set: " + xidVer);
+
+ return false;
+ }
+ }
+
+ /**
+ * @param xidVer Completed transaction version.
+ * @return If transaction was not already present in completed set.
+ */
+ public boolean addRolledbackTx(GridCacheVersion xidVer) {
+ assert !committedVers.contains(xidVer);
+
+ if (rolledbackVers.add(xidVer)) {
+ if (log.isDebugEnabled())
+ log.debug("Added transaction to rolled back version set: " +
xidVer);
+
+ return true;
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Transaction is already present in rolled back
version set: " + xidVer);
+
+ return false;
+ }
+ }
+
+ /**
+ * @param tx Transaction.
+ */
+ private void processCompletedEntries(IgniteTxEx<K, V> tx) {
+ if (tx.needsCompletedVersions()) {
+ GridCacheVersion min = minVersion(tx.readEntries(),
tx.xidVersion(), tx);
+
+ min = minVersion(tx.writeEntries(), min, tx);
+
+ assert min != null;
+
+ tx.completedVersions(min, committedVersions(min),
rolledbackVersions(min));
+ }
+ }
+
+ /**
+ * Collects versions for all pending locks for all entries within
transaction
+ *
+ * @param dhtTxLoc Transaction being committed.
+ */
+ private void collectPendingVersions(GridDhtTxLocal<K, V> dhtTxLoc) {
+ if (dhtTxLoc.needsCompletedVersions()) {
+ if (log.isDebugEnabled())
+ log.debug("Checking for pending locks with version less then
tx version: " + dhtTxLoc);
+
+ Set<GridCacheVersion> vers = new LinkedHashSet<>();
+
+ collectPendingVersions(dhtTxLoc.readEntries(),
dhtTxLoc.xidVersion(), vers);
+ collectPendingVersions(dhtTxLoc.writeEntries(),
dhtTxLoc.xidVersion(), vers);
+
+ if (!vers.isEmpty())
+ dhtTxLoc.pendingVersions(vers);
+ }
+ }
+
+ /**
+ * Gets versions of all not acquired locks for collection of tx entries
that are less then base version.
+ *
+ * @param entries Tx entries to process.
+ * @param baseVer Base version to compare with.
+ * @param vers Collection of versions that will be populated.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ private void collectPendingVersions(Iterable<IgniteTxEntry<K, V>> entries,
+ GridCacheVersion baseVer, Set<GridCacheVersion> vers) {
+
+ // The locks are not released yet, so we can safely list pending
candidates versions.
+ for (IgniteTxEntry<K, V> txEntry : entries) {
+ GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+ try {
+ // If check should be faster then exception handling.
+ if (!cached.obsolete()) {
+ for (GridCacheMvccCandidate<K> cand :
cached.localCandidates()) {
+ if (!cand.owner() &&
cand.version().compareTo(baseVer) < 0) {
+ if (log.isDebugEnabled())
+ log.debug("Adding candidate version to
pending set: " + cand);
+
+ vers.add(cand.version());
+ }
+ }
+ }
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("There are no pending locks for entry (entry
was deleted in transaction): " + txEntry);
+ }
+ }
+ }
+
+ /**
+ * Go through all candidates for entries involved in transaction and find
their min
+ * version. We know that these candidates will commit after this
transaction, and
+ * therefore we can grab the min version so we can send all committed and
rolled
+ * back versions from min to current to remote nodes for re-ordering.
+ *
+ * @param entries Entries.
+ * @param min Min version so far.
+ * @param tx Transaction.
+ * @return Minimal available version.
+ */
+ private GridCacheVersion minVersion(Iterable<IgniteTxEntry<K, V>>
entries, GridCacheVersion min,
+ IgniteTxEx<K, V> tx) {
+ for (IgniteTxEntry<K, V> txEntry : entries) {
+ GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+ // We are assuming that this method is only called on commit. In
that
+ // case, if lock is held, entry can never be removed.
+ assert txEntry.isRead() || !cached.obsolete(tx.xidVersion()) :
+ "Invalid obsolete version for transaction [entry=" + cached +
", tx=" + tx + ']';
+
+ for (GridCacheMvccCandidate<K> cand : cached.remoteMvccSnapshot())
+ if (min == null || cand.version().isLess(min))
+ min = cand.version();
+ }
+
+ return min;
+ }
+
+ /**
+ * Commits a transaction.
+ *
+ * @param tx Transaction to commit.
+ */
+ public void commitTx(IgniteTxEx<K, V> tx) {
+ assert tx != null;
+ assert tx.state() == COMMITTING : "Invalid transaction state for
commit from tm [state=" + tx.state() +
+ ", expected=COMMITTING, tx=" + tx + ']';
+
+ if (log.isDebugEnabled())
+ log.debug("Committing from TM [locNodeId=" + cctx.localNodeId() +
", tx=" + tx + ']');
+
+ if (tx.timeout() > 0) {
+ cctx.time().removeTimeoutObject(tx);
+
+ if (log.isDebugEnabled())
+ log.debug("Unregistered transaction with timeout processor: "
+ tx);
+ }
+
+ /*
+ * Note that write phase is handled by transaction adapter itself,
+ * so we don't do it here.
+ */
+
+ // 1. Make sure that committed version has been recorded.
+ if (!(committedVers.contains(tx.xidVersion()) ||
tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
+ uncommitTx(tx);
+
+ throw new IgniteException("Missing commit version (consider
increasing " +
+ GG_MAX_COMPLETED_TX_COUNT + " system property) [ver=" +
tx.xidVersion() + ", firstVer=" +
+ committedVers.firstx() + ", lastVer=" + committedVers.lastx()
+ ", tx=" + tx.xid() + ']');
+ }
+
+ ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap =
transactionMap(tx);
+
+ if (txIdMap.remove(tx.xidVersion(), tx)) {
+ // 2. Must process completed entries before unlocking!
+ processCompletedEntries(tx);
+
+ if (tx instanceof GridDhtTxLocal) {
+ GridDhtTxLocal<K, V> dhtTxLoc = (GridDhtTxLocal<K, V>)tx;
+
+ collectPendingVersions(dhtTxLoc);
+ }
+
+ // 3.1 Call dataStructures manager.
+ for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts())
+ cacheCtx.dataStructures().onTxCommitted(tx);
+
+ // 3.2 Add to pessimistic commit buffer if needed.
+ addPessimisticRecovery(tx);
+
+ // 4. Unlock write resources.
+ if (tx.groupLock())
+ unlockGroupLocks(tx);
+ else
+ unlockMultiple(tx, tx.writeEntries());
+
+ // 5. For pessimistic transaction, unlock read resources if
required.
+ if (tx.pessimistic() && !tx.readCommitted() && !tx.groupLock())
+ unlockMultiple(tx, tx.readEntries());
+
+ // 6. Notify evictions.
+ notifyEvitions(tx);
+
+ // 7. Remove obsolete entries from cache.
+ removeObsolete(tx);
+
+ // 8. Assign transaction number at the end of transaction.
+ tx.endVersion(cctx.versions().next(tx.topologyVersion()));
+
+ // 9. Clean start transaction number for this transaction.
+ if (cctx.txConfig().isTxSerializableEnabled())
+ decrementStartVersionCount(tx);
+
+ // 10. Add to committed queue only if it is possible
+ // that this transaction can affect other ones.
+ if (cctx.txConfig().isTxSerializableEnabled() &&
tx.enforceSerializable() && !isSafeToForget(tx))
+ committedQ.add(tx);
+
+ // 11. Remove from per-thread storage.
+ if (tx.local() && !tx.dht())
+ threadMap.remove(tx.threadId(), tx);
+
+ // 12. Unregister explicit locks.
+ if (!tx.alternateVersions().isEmpty()) {
+ for (GridCacheVersion ver : tx.alternateVersions())
+ idMap.remove(ver);
+ }
+
+ // 13. Remove Near-2-DHT mappings.
+ if (tx instanceof GridCacheMappedVersion) {
+ GridCacheVersion mapped =
((GridCacheMappedVersion)tx).mappedVersion();
+
+ if (mapped != null)
+ mappedVers.remove(mapped);
+ }
+
+ // 14. Clear context.
+ txContextReset();
+
+ // 15. Update metrics.
+ if (!tx.dht() && tx.local()) {
+ cctx.txMetrics().onTxCommit();
+
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<K, V> cacheCtx =
cctx.cacheContext(cacheId);
+
- cacheCtx.cache().metrics0().onTxCommit();
++ if
(cacheCtx.cache().configuration().isStatisticsEnabled())
++
cacheCtx.cache().metrics0().onTxCommit(System.nanoTime() - tx.startTime());
+ }
+ }
+
+ if (slowTxWarnTimeout > 0 && tx.local() &&
+ U.currentTimeMillis() - tx.startTime() > slowTxWarnTimeout)
+ U.warn(log, "Slow transaction detected [tx=" + tx +
+ ", slowTxWarnTimeout=" + slowTxWarnTimeout + ']') ;
+
+ if (log.isDebugEnabled())
+ log.debug("Committed from TM [locNodeId=" +
cctx.localNodeId() + ", tx=" + tx + ']');
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Did not commit from TM (was already committed): " +
tx);
+ }
+
+ /**
+ * Rolls back a transaction.
+ *
+ * @param tx Transaction to rollback.
+ */
+ public void rollbackTx(IgniteTxEx<K, V> tx) {
+ assert tx != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Rolling back from TM [locNodeId=" + cctx.localNodeId()
+ ", tx=" + tx + ']');
+
+ // 1. Record transaction version to avoid duplicates.
+ addRolledbackTx(tx);
+
+ ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap =
transactionMap(tx);
+
+ if (txIdMap.remove(tx.xidVersion(), tx)) {
+ // 2. Unlock write resources.
+ unlockMultiple(tx, tx.writeEntries());
+
+ // 3. For pessimistic transaction, unlock read resources if
required.
+ if (tx.pessimistic() && !tx.readCommitted())
+ unlockMultiple(tx, tx.readEntries());
+
+ // 4. Notify evictions.
+ notifyEvitions(tx);
+
+ // 5. Remove obsolete entries.
+ removeObsolete(tx);
+
+ // 6. Clean start transaction number for this transaction.
+ if (cctx.txConfig().isTxSerializableEnabled())
+ decrementStartVersionCount(tx);
+
+ // 7. Remove from per-thread storage.
+ if (tx.local() && !tx.dht())
+ threadMap.remove(tx.threadId(), tx);
+
+ // 8. Unregister explicit locks.
+ if (!tx.alternateVersions().isEmpty())
+ for (GridCacheVersion ver : tx.alternateVersions())
+ idMap.remove(ver);
+
+ // 9. Remove Near-2-DHT mappings.
+ if (tx instanceof GridCacheMappedVersion)
+
mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion());
+
+ // 10. Clear context.
+ txContextReset();
+
+ // 11. Update metrics.
+ if (!tx.dht() && tx.local()) {
+ cctx.txMetrics().onTxRollback();
+
+ for (int cacheId : tx.activeCacheIds()) {
+ GridCacheContext<K, V> cacheCtx =
cctx.cacheContext(cacheId);
+
- cacheCtx.cache().metrics0().onTxRollback();
++
cacheCtx.cache().metrics0().onTxRollback(System.nanoTime() - tx.startTime());
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Rolled back from TM: " + tx);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Did not rollback from TM (was already rolled back): "
+ tx);
+ }
+
+ /**
+ * Tries to minimize damage from partially-committed transaction.
+ *
+ * @param tx Tx to uncommit.
+ */
+ public void uncommitTx(IgniteTxEx<K, V> tx) {
+ assert tx != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Uncommiting from TM: " + tx);
+
+ ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap =
transactionMap(tx);
+
+ if (txIdMap.remove(tx.xidVersion(), tx)) {
+ // 1. Unlock write resources.
+ unlockMultiple(tx, tx.writeEntries());
+
+ // 2. For pessimistic transaction, unlock read resources if
required.
+ if (tx.pessimistic() && !tx.readCommitted())
+ unlockMultiple(tx, tx.readEntries());
+
+ // 3. Notify evictions.
+ notifyEvitions(tx);
+
+ // 4. Clean start transaction number for this transaction.
+ if (cctx.txConfig().isTxSerializableEnabled())
+ decrementStartVersionCount(tx);
+
+ // 5. Remove from per-thread storage.
+ if (tx.local() && !tx.dht())
+ threadMap.remove(tx.threadId(), tx);
+
+ // 6. Unregister explicit locks.
+ if (!tx.alternateVersions().isEmpty())
+ for (GridCacheVersion ver : tx.alternateVersions())
+ idMap.remove(ver);
+
+ // 7. Remove Near-2-DHT mappings.
+ if (tx instanceof GridCacheMappedVersion)
+
mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion());
+
+ // 8. Clear context.
+ txContextReset();
+
+ if (log.isDebugEnabled())
+ log.debug("Uncommitted from TM: " + tx);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Did not uncommit from TM (was already committed or
rolled back): " + tx);
+ }
+
+ /**
+ * Gets transaction ID map depending on transaction type.
+ *
+ * @param tx Transaction.
+ * @return Transaction map.
+ */
+ private ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>>
transactionMap(IgniteTxEx<K, V> tx) {
+ return (tx.near() && !tx.local()) ? nearIdMap : idMap;
+ }
+
+ /**
+ * @param tx Transaction to notify evictions for.
+ */
+ private void notifyEvitions(IgniteTxEx<K, V> tx) {
+ if (tx.internal() && !tx.groupLock())
+ return;
+
+ for (IgniteTxEntry<K, V> txEntry : tx.allEntries())
+ txEntry.cached().context().evicts().touch(txEntry, tx.local());
+ }
+
+ /**
+ * Callback invoked whenever a member of a transaction acquires
+ * lock ownership.
+ *
+ * @param entry Cache entry.
+ * @param owner Candidate that won ownership.
+ * @return {@code True} if transaction was notified, {@code false}
otherwise.
+ */
+ public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry,
GridCacheMvccCandidate<K> owner) {
+ // We only care about acquired locks.
+ if (owner != null) {
+ IgniteTxAdapter<K, V> tx = tx(owner.version());
+
+ if (tx == null)
+ tx = nearTx(owner.version());
+
+ if (tx != null) {
+ if (!tx.local()) {
+ if (log.isDebugEnabled())
+ log.debug("Found transaction for owner changed event
[owner=" + owner + ", entry=" + entry +
+ ", tx=" + tx + ']');
+
+ tx.onOwnerChanged(entry, owner);
+
+ return true;
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Ignoring local transaction for owner change
event: " + tx);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Transaction not found for owner changed event
[owner=" + owner + ", entry=" + entry + ']');
+ }
+
+ return false;
+ }
+
+ /**
+ * Callback called by near finish future before sending near finish
request to remote node. Will increment
+ * per-thread counter so that further awaitAck call will wait for finish
response.
+ *
+ * @param rmtNodeId Remote node ID for which finish request is being sent.
+ * @param threadId Near tx thread ID.
+ */
+ public void beforeFinishRemote(UUID rmtNodeId, long threadId) {
+ if (finishSyncDisabled)
+ return;
+
+ assert txFinishSync != null;
+
+ txFinishSync.onFinishSend(rmtNodeId, threadId);
+ }
+
+ /**
+ * Callback invoked when near finish response is received from remote
node.
+ *
+ * @param rmtNodeId Remote node ID from which response is received.
+ * @param threadId Near tx thread ID.
+ */
+ public void onFinishedRemote(UUID rmtNodeId, long threadId) {
+ if (finishSyncDisabled)
+ return;
+
+ assert txFinishSync != null;
+
+ txFinishSync.onAckReceived(rmtNodeId, threadId);
+ }
+
+ /**
+ * Asynchronously waits for last finish request ack.
+ *
+ * @param rmtNodeId Remote node ID.
+ * @param threadId Near tx thread ID.
+ * @return {@code null} if ack was received or future that will be
completed when ack is received.
+ */
+ @Nullable public IgniteFuture<?> awaitFinishAckAsync(UUID rmtNodeId, long
threadId) {
+ if (finishSyncDisabled)
+ return null;
+
+ assert txFinishSync != null;
+
+ return txFinishSync.awaitAckAsync(rmtNodeId, threadId);
+ }
+
+ /**
+ * For test purposes only.
+ *
+ * @param finishSyncDisabled {@code True} if finish sync should be
disabled.
+ */
+ public void finishSyncDisabled(boolean finishSyncDisabled) {
+ this.finishSyncDisabled = finishSyncDisabled;
+ }
+
+ /**
+ * @param tx Transaction.
+ * @param entries Entries to lock.
+ * @return {@code True} if all keys were locked.
+ * @throws IgniteCheckedException If lock has been cancelled.
+ */
+ private boolean lockMultiple(IgniteTxEx<K, V> tx,
Iterable<IgniteTxEntry<K, V>> entries)
+ throws IgniteCheckedException {
+ assert tx.optimistic();
+
+ long remainingTime = U.currentTimeMillis() - (tx.startTime() +
tx.timeout());
+
+ // For serializable transactions, failure to acquire lock means
+ // that there is a serializable conflict. For all other isolation
levels,
+ // we wait for the lock.
+ long timeout = tx.timeout() == 0 ? 0 : remainingTime;
+
+ for (IgniteTxEntry<K, V> txEntry1 : entries) {
+ // Check if this entry was prepared before.
+ if (!txEntry1.markPrepared())
+ continue;
+
+ GridCacheContext<K, V> cacheCtx = txEntry1.context();
+
+ while (true) {
+ try {
+ GridCacheEntryEx<K, V> entry1 = txEntry1.cached();
+
+ assert !entry1.detached() : "Expected non-detached entry
for near transaction " +
+ "[locNodeId=" + cctx.localNodeId() + ", entry=" +
entry1 + ']';
+
+ if (!entry1.tmLock(tx, timeout)) {
+ // Unlock locks locked so far.
+ for (IgniteTxEntry<K, V> txEntry2 : entries) {
+ if (txEntry2 == txEntry1)
+ break;
+
+ txEntry2.cached().txUnlock(tx);
+ }
+
+ return false;
+ }
+
+ entry1.unswap();
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in TM lockMultiple(..)
method (will retry): " + txEntry1);
+
+ try {
+ // Renew cache entry.
+
txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key()), txEntry1.keyBytes());
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ assert tx.dht() : "Received invalid partition for non
DHT transaction [tx=" +
+ tx + ", invalidPart=" + e.partition() + ']';
+
+ // If partition is invalid, we ignore this entry.
+ tx.addInvalidPartition(cacheCtx, e.partition());
+
+ break;
+ }
+ }
+ catch (GridDistributedLockCancelledException ignore) {
+ tx.setRollbackOnly();
+
+ throw new IgniteCheckedException("Entry lock has been
cancelled for transaction: " + tx);
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Unlocks entries locked by group transaction.
+ *
+ * @param txx Transaction.
+ */
+ @SuppressWarnings("unchecked")
+ private void unlockGroupLocks(IgniteTxEx txx) {
+ IgniteTxKey grpLockKey = txx.groupLockKey();
+
+ assert grpLockKey != null;
+
+ if (grpLockKey == null)
+ return;
+
+ IgniteTxEntry txEntry = txx.entry(grpLockKey);
+
+ assert txEntry != null || (txx.near() && !txx.local());
+
+ if (txEntry != null) {
+ GridCacheContext cacheCtx = txEntry.context();
+
+ // Group-locked entries must be locked.
+ while (true) {
+ try {
+ GridCacheEntryEx<K, V> entry = txEntry.cached();
+
+ assert entry != null;
+
+ entry.txUnlock(txx);
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in TM
unlockGroupLocks(..) method (will retry): " + txEntry);
+
+ GridCacheAdapter cache = cacheCtx.cache();
+
+ // Renew cache entry.
+ txEntry.cached(cache.entryEx(txEntry.key()),
txEntry.keyBytes());
+ }
+ }
+ }
+ }
+
+ /**
+ * @param tx Owning transaction.
+ * @param entries Entries to unlock.
+ */
+ private void unlockMultiple(IgniteTxEx<K, V> tx,
Iterable<IgniteTxEntry<K, V>> entries) {
+ for (IgniteTxEntry<K, V> txEntry : entries) {
+ GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+ while (true) {
+ try {
+ GridCacheEntryEx<K, V> entry = txEntry.cached();
+
+ if (entry.detached())
+ break;
+
+ assert entry != null;
+
+ entry.txUnlock(tx);
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in TM unlockMultiple(..)
method (will retry): " + txEntry);
+
+ // Renew cache entry.
+ txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()),
txEntry.keyBytes());
+ }
+ }
+ }
+ }
+
+ /**
+ * @param sync Transaction synchronizations to add.
+ */
+ public void addSynchronizations(IgniteTxSynchronization... sync) {
+ if (F.isEmpty(sync))
+ return;
+
+ F.copy(syncs, sync);
+ }
+
+ /**
+ * @param sync Transaction synchronizations to remove.
+ */
+ public void removeSynchronizations(IgniteTxSynchronization... sync) {
+ if (F.isEmpty(sync))
+ return;
+
+ F.lose(syncs, false, Arrays.asList(sync));
+ }
+
+ /**
+ * @return Registered transaction synchronizations
+ */
+ public Collection<IgniteTxSynchronization> synchronizations() {
+ return Collections.unmodifiableList(new LinkedList<>(syncs));
+ }
+
+ /**
+ * @param prevState Previous state.
+ * @param newState New state.
+ * @param tx Cache transaction.
+ */
+ public void onTxStateChange(@Nullable IgniteTxState prevState,
IgniteTxState newState, IgniteTx tx) {
+ // Notify synchronizations.
+ for (IgniteTxSynchronization s : syncs)
+ s.onStateChanged(prevState, newState, tx);
+ }
+
+ /**
+ * @param tx Committing transaction.
+ */
+ public void txContext(IgniteTxEx tx) {
+ threadCtx.set(tx);
+ }
+
+ /**
+ * @return Currently committing transaction.
+ */
+ @SuppressWarnings({"unchecked"})
+ private IgniteTxEx<K, V> txContext() {
+ return threadCtx.get();
+ }
+
+ /**
+ * Gets version of transaction in tx context or {@code null}
+ * if tx context is empty.
+ * <p>
+ * This is a convenience method provided mostly for debugging.
+ *
+ * @return Transaction version from transaction context.
+ */
+ @Nullable public GridCacheVersion txContextVersion() {
+ IgniteTxEx<K, V> tx = txContext();
+
+ return tx == null ? null : tx.xidVersion();
+ }
+
+ /**
+ * Commit ended.
+ */
+ public void txContextReset() {
+ threadCtx.set(null);
+ }
+
+ /**
+ * @return All transactions.
+ */
+ public Collection<IgniteTxEx<K, V>> txs() {
+ return F.concat(false, idMap.values(), nearIdMap.values());
+ }
+
+ /**
+ * @return Slow tx warn timeout.
+ */
+ public int slowTxWarnTimeout() {
+ return slowTxWarnTimeout;
+ }
+
+ /**
+ * @param slowTxWarnTimeout Slow tx warn timeout.
+ */
+ public void slowTxWarnTimeout(int slowTxWarnTimeout) {
+ this.slowTxWarnTimeout = slowTxWarnTimeout;
+ }
+
+ /**
+ * Checks if transactions with given near version ID was prepared or
committed.
+ *
+ * @param nearVer Near version ID.
+ * @param txNum Number of transactions.
+ * @return {@code True} if transactions were prepared or committed.
+ */
+ public boolean txsPreparedOrCommitted(GridCacheVersion nearVer, int
txNum) {
+ Collection<GridCacheVersion> processedVers = null;
+
+ for (IgniteTxEx<K, V> tx : txs()) {
+ if (nearVer.equals(tx.nearXidVersion())) {
+ IgniteTxState state = tx.state();
+
+ if (state == PREPARED || state == COMMITTING || state ==
COMMITTED) {
+ if (--txNum == 0)
+ return true;
+ }
+ else {
+ if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) {
+ tx.rollbackAsync();
+
+ if (log.isDebugEnabled())
+ log.debug("Transaction was not prepared (rolled
back): " + tx);
+
+ return false;
+ }
+ else {
+ if (tx.state() == COMMITTED) {
+ if (--txNum == 0)
+ return true;
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Transaction is not prepared: " +
tx);
+
+ return false;
+ }
+ }
+ }
+
+ if (processedVers == null)
+ processedVers = new HashSet<>(txNum, 1.0f);
+
+ processedVers.add(tx.xidVersion());
+ }
+ }
+
+ // Not all transactions were found. Need to scan committed versions
to check
+ // if transaction was already committed.
+ for (GridCacheVersion ver : committedVers) {
+ if (processedVers != null && processedVers.contains(ver))
+ continue;
+
+ if (ver instanceof CommittedVersion) {
+ CommittedVersion commitVer = (CommittedVersion)ver;
+
+ if (commitVer.nearVer.equals(nearVer)) {
+ if (--txNum == 0)
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Adds transaction to pessimistic recovery buffer if needed.
+ *
+ * @param tx Committed transaction to add.
+ */
+ private void addPessimisticRecovery(IgniteTxEx<K, V> tx) {
+ if (pessimisticRecoveryBuf == null)
+ return;
+
+ // Do not store recovery information for optimistic or replicated
local transactions.
+ if (tx.optimistic() || (tx.local() && tx.replicated()))
+ return;
+
+ pessimisticRecoveryBuf.addCommittedTx(tx);
+ }
+
+ /**
+ * Checks whether transaction with given near version was committed on
this node and returns commit info.
+ *
+ * @param nearTxVer Near tx version.
+ * @param originatingNodeId Originating node ID.
+ * @param originatingThreadId Originating thread ID.
+ * @return Commit info, if present.
+ */
+ @Nullable public GridCacheCommittedTxInfo<K, V>
txCommitted(GridCacheVersion nearTxVer,
+ UUID originatingNodeId, long originatingThreadId) {
+ assert pessimisticRecoveryBuf != null : "Should not be called for
LOCAL cache.";
+
+ return pessimisticRecoveryBuf.committedTx(nearTxVer,
originatingNodeId, originatingThreadId);
+ }
+
+ /**
+ * Gets local transaction for pessimistic tx recovery.
+ *
+ * @param nearXidVer Near tx ID.
+ * @return Near local or colocated local transaction.
+ */
+ @Nullable public IgniteTxEx<K, V> localTxForRecovery(GridCacheVersion
nearXidVer, boolean markFinalizing) {
+ // First check if we have near transaction with this ID.
+ IgniteTxEx<K, V> tx = idMap.get(nearXidVer);
+
+ if (tx == null) {
+ // Check all local transactions and mark them as waiting for
recovery to prevent finish race.
+ for (IgniteTxEx<K, V> txEx : idMap.values()) {
+ if (nearXidVer.equals(txEx.nearXidVersion())) {
+ if (!markFinalizing ||
!txEx.markFinalizing(RECOVERY_WAIT))
+ tx = txEx;
+ }
+ }
+ }
+
+ // Either we found near transaction or one of transactions is being
committed by user.
+ // Wait for it and send reply.
+ if (tx != null && tx.local())
+ return tx;
+
+ return null;
+ }
+
+ /**
+ * Commits or rolls back prepared transaction.
+ *
+ * @param tx Transaction.
+ * @param commit Whether transaction should be committed or rolled back.
+ */
+ public void finishOptimisticTxOnRecovery(final IgniteTxEx<K, V> tx,
boolean commit) {
+ if (log.isDebugEnabled())
+ log.debug("Finishing prepared transaction [tx=" + tx + ",
commit=" + commit + ']');
+
+ if (!tx.markFinalizing(RECOVERY_FINISH)) {
+ if (log.isDebugEnabled())
+ log.debug("Will not try to commit prepared transaction (could
not mark finalized): " + tx);
+
+ return;
+ }
+
+ if (tx instanceof GridDistributedTxRemoteAdapter) {
+ IgniteTxRemoteEx<K,V> rmtTx = (IgniteTxRemoteEx<K, V>)tx;
+
+ rmtTx.doneRemote(tx.xidVersion(),
Collections.<GridCacheVersion>emptyList(),
Collections.<GridCacheVersion>emptyList(),
+ Collections.<GridCacheVersion>emptyList());
+ }
+
+ if (commit)
+ tx.commitAsync().listenAsync(new CommitListener(tx));
+ else
+ tx.rollbackAsync();
+ }
+
+ /**
+ * Commits or rolls back pessimistic transaction.
+ *
+ * @param tx Transaction to finish.
+ * @param commitInfo Commit information.
+ */
+ public void finishPessimisticTxOnRecovery(final IgniteTxEx<K, V> tx,
GridCacheCommittedTxInfo<K, V> commitInfo) {
+ if (!tx.markFinalizing(RECOVERY_FINISH)) {
+ if (log.isDebugEnabled())
+ log.debug("Will not try to finish pessimistic transaction
(could not mark as finalizing): " + tx);
+
+ return;
+ }
+
+ if (tx instanceof GridDistributedTxRemoteAdapter) {
+ IgniteTxRemoteEx<K,V> rmtTx = (IgniteTxRemoteEx<K, V>)tx;
+
+ rmtTx.doneRemote(tx.xidVersion(),
Collections.<GridCacheVersion>emptyList(),
Collections.<GridCacheVersion>emptyList(),
+ Collections.<GridCacheVersion>emptyList());
+ }
+
+ try {
+ tx.prepare();
+
+ if (commitInfo != null) {
+ for (IgniteTxEntry<K, V> entry : commitInfo.recoveryWrites())
{
+ IgniteTxEntry<K, V> write =
tx.writeMap().get(entry.txKey());
+
+ if (write != null) {
+ GridCacheEntryEx<K, V> cached = write.cached();
+
+ IgniteTxEntry<K, V> recovered =
entry.cleanCopy(write.context());
+
+ if (cached == null || cached.detached())
+ cached =
write.context().cache().entryEx(entry.key(), tx.topologyVersion());
+
+ recovered.cached(cached, cached.keyBytes());
+
+ tx.writeMap().put(entry.txKey(), recovered);
+
+ continue;
+ }
+
+ ((IgniteTxAdapter<K,
V>)tx).recoveryWrites(commitInfo.recoveryWrites());
+
+ // If write was not found, check read.
+ IgniteTxEntry<K, V> read =
tx.readMap().remove(entry.txKey());
+
+ if (read != null)
+ tx.writeMap().put(entry.txKey(), entry);
+ }
+
+ tx.commitAsync().listenAsync(new CommitListener(tx));
+ }
+ else
+ tx.rollbackAsync();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to prepare pessimistic transaction (will
invalidate): " + tx, e);
+
+ salvageTx(tx);
+ }
+ }
+
+ /**
+ * @param req Check committed request.
+ * @return Check committed future.
+ */
+ public IgniteFuture<GridCacheCommittedTxInfo<K, V>>
checkPessimisticTxCommitted(GridCachePessimisticCheckCommittedTxRequest req) {
+ // First check if we have near transaction with this ID.
+ IgniteTxEx<K, V> tx = localTxForRecovery(req.nearXidVersion(),
!req.nearOnlyCheck());
+
+ // Either we found near transaction or one of transactions is being
committed by user.
+ // Wait for it and send reply.
+ if (tx != null) {
+ assert tx.local();
+
+ if (log.isDebugEnabled())
+ log.debug("Found active near transaction, will wait for
completion [req=" + req + ", tx=" + tx + ']');
+
+ final IgniteTxEx<K, V> tx0 = tx;
+
+ return tx.finishFuture().chain(new C1<IgniteFuture<IgniteTx>,
GridCacheCommittedTxInfo<K, V>>() {
+ @Override public GridCacheCommittedTxInfo<K, V>
apply(IgniteFuture<IgniteTx> txFut) {
+ GridCacheCommittedTxInfo<K, V> info = null;
+
+ if (tx0.state() == COMMITTED)
+ info = new GridCacheCommittedTxInfo<>(tx0);
+
+ return info;
+ }
+ });
+ }
+
+ GridCacheCommittedTxInfo<K, V> info =
txCommitted(req.nearXidVersion(), req.originatingNodeId(),
+ req.originatingThreadId());
+
+ if (info == null)
+ info = txCommitted(req.nearXidVersion(), req.originatingNodeId(),
req.originatingThreadId());
+
+ return new GridFinishedFutureEx<>(info);
+ }
+
+ /**
+ * Timeout object for node failure handler.
+ */
+ private final class NodeFailureTimeoutObject extends
GridTimeoutObjectAdapter {
+ /** Left or failed node. */
+ private final UUID evtNodeId;
+
+ /**
+ * @param evtNodeId Event node ID.
+ */
+ private NodeFailureTimeoutObject(UUID evtNodeId) {
+ super(IgniteUuid.fromUuid(cctx.localNodeId()),
TX_SALVAGE_TIMEOUT);
+
+ this.evtNodeId = evtNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ try {
+ cctx.kernalContext().gateway().readLock();
+ }
+ catch (IllegalStateException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to acquire kernal gateway (grid is
stopping).");
+
+ return;
+ }
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Processing node failed event [locNodeId=" +
cctx.localNodeId() +
+ ", failedNodeId=" + evtNodeId + ']');
+
+ for (IgniteTxEx<K, V> tx : txs()) {
+ if ((tx.near() && !tx.local()) || (tx.storeUsed() &&
tx.masterNodeIds().contains(evtNodeId))) {
+ // Invalidate transactions.
+ salvageTx(tx, false, RECOVERY_FINISH);
+ }
+ else if (tx.optimistic()) {
+ // Check prepare only if originating node ID failed.
Otherwise parent node will finish this tx.
+ if (tx.originatingNodeId().equals(evtNodeId)) {
+ if (tx.state() == PREPARED)
+ commitIfPrepared(tx);
+ else {
+ if (tx.setRollbackOnly())
+ tx.rollbackAsync();
+ // If we could not mark tx as rollback, it
means that transaction is being committed.
+ }
+ }
+ }
+ else {
+ // Pessimistic.
+ if (tx.originatingNodeId().equals(evtNodeId)) {
+ if (tx.state() != COMMITTING && tx.state() !=
COMMITTED)
+ commitIfRemotelyCommitted(tx);
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skipping pessimistic
transaction check (transaction is being committed) " +
+ "[tx=" + tx + ", locNodeId=" +
cctx.localNodeId() + ']');
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Skipping pessimistic transaction
check [tx=" + tx +
+ ", evtNodeId=" + evtNodeId + ",
locNodeId=" + cctx.localNodeId() + ']');
+ }
+ }
+ }
+ }
+ finally {
+ cctx.kernalContext().gateway().readUnlock();
+ }
+ }
+
+ /**
+ * Commits optimistic transaction in case when node started
transaction failed, but all related
+ * transactions were prepared (invalidates transaction if it is not
fully prepared).
+ *
+ * @param tx Transaction.
+ */
+ private void commitIfPrepared(IgniteTxEx<K, V> tx) {
+ assert tx instanceof GridDhtTxLocal || tx instanceof
GridDhtTxRemote : tx;
+ assert !F.isEmpty(tx.transactionNodes());
+ assert tx.nearXidVersion() != null;
+
+
+ GridCacheOptimisticCheckPreparedTxFuture<K, V> fut = new
GridCacheOptimisticCheckPreparedTxFuture<>(
+ cctx, tx, evtNodeId, tx.transactionNodes());
+
+ cctx.mvcc().addFuture(fut);
+
+ if (log.isDebugEnabled())
+ log.debug("Checking optimistic transaction state on remote
nodes [tx=" + tx + ", fut=" + fut + ']');
+
+ fut.prepare();
+ }
+
+ /**
+ * Commits pessimistic transaction if at least one of remote nodes
has committed this transaction.
+ *
+ * @param tx Transaction.
+ */
+ private void commitIfRemotelyCommitted(IgniteTxEx<K, V> tx) {
+ assert tx instanceof GridDhtTxLocal || tx instanceof
GridDhtTxRemote : tx;
+
+ GridCachePessimisticCheckCommittedTxFuture<K, V> fut = new
GridCachePessimisticCheckCommittedTxFuture<>(
+ cctx, tx, evtNodeId);
+
+ cctx.mvcc().addFuture(fut);
+
+ if (log.isDebugEnabled())
+ log.debug("Checking pessimistic transaction state on remote
nodes [tx=" + tx + ", fut=" + fut + ']');
+
+ fut.prepare();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CommittedVersion extends GridCacheVersion {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Corresponding near version. Transient. */
+ private GridCacheVersion nearVer;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public CommittedVersion() {
+ // No-op.
+ }
+
+ /**
+ * @param ver Committed version.
+ * @param nearVer Near transaction version.
+ */
+ private CommittedVersion(GridCacheVersion ver, GridCacheVersion
nearVer) {
+ super(ver.topologyVersion(), ver.globalTime(), ver.order(),
ver.nodeOrder(), ver.dataCenterId());
+
+ assert nearVer != null;
+
+ this.nearVer = nearVer;
+ }
+ }
+
+ /**
+ * Atomic integer that compares only using references, not values.
+ */
+ private static final class AtomicInt extends AtomicInteger {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param initVal Initial value.
+ */
+ private AtomicInt(int initVal) {
+ super(initVal);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ // Reference only.
+ return obj == this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return super.hashCode();
+ }
+ }
+
+ /**
+ * Commit listener. Checks if commit succeeded and rollbacks if case of
error.
+ */
+ private class CommitListener implements CI1<IgniteFuture<IgniteTx>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Transaction. */
+ private final IgniteTxEx<K, V> tx;
+
+ /**
+ * @param tx Transaction.
+ */
+ private CommitListener(IgniteTxEx<K, V> tx) {
+ this.tx = tx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteFuture<IgniteTx> t) {
+ try {
+ t.get();
+ }
+ catch (IgniteTxOptimisticException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Optimistic failure while committing prepared
transaction (will rollback): " +
+ tx);
+
+ tx.rollbackAsync();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to commit transaction during failover: "
+ tx, e);
+ }
+ }
+ }
+ }