http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 8b6a693..5dfcf2a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -17,6 +17,7 @@ import org.gridgain.grid.kernal.managers.discovery.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.tostring.*; @@ -36,8 +37,8 @@ import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; /** * */ -public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<GridCacheTxEx<K, V>> - implements GridCacheMvccFuture<K, V, GridCacheTxEx<K, V>> { +public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteTxEx<K, V>> + implements GridCacheMvccFuture<K, V, IgniteTxEx<K, V>> { /** */ private static final long serialVersionUID = 0L; @@ -79,12 +80,12 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * @param tx Transaction. */ public GridNearTxPrepareFuture(GridCacheSharedContext<K, V> cctx, final GridNearTxLocal<K, V> tx) { - super(cctx.kernalContext(), new IgniteReducer<GridCacheTxEx<K, V>, GridCacheTxEx<K, V>>() { - @Override public boolean collect(GridCacheTxEx<K, V> e) { + super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, IgniteTxEx<K, V>>() { + @Override public boolean collect(IgniteTxEx<K, V> e) { return true; } - @Override public GridCacheTxEx<K, V> reduce() { + @Override public IgniteTxEx<K, V> reduce() { // Nothing to aggregate. return tx; } @@ -202,11 +203,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * @return {@code True} if all locks are owned. */ private boolean checkLocks() { - Collection<GridCacheTxEntry<K, V>> checkEntries = tx.groupLock() ? + Collection<IgniteTxEntry<K, V>> checkEntries = tx.groupLock() ? Collections.singletonList(tx.groupLockEntry()) : tx.writeEntries(); - for (GridCacheTxEntry<K, V> txEntry : checkEntries) { + for (IgniteTxEntry<K, V> txEntry : checkEntries) { // Wait for near locks only. if (!txEntry.context().isNear()) continue; @@ -251,7 +252,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut */ public void onResult(UUID nodeId, GridNearTxPrepareResponse<K, V> res) { if (!isDone()) { - for (IgniteFuture<GridCacheTxEx<K, V>> fut : pending()) { + for (IgniteFuture<IgniteTxEx<K, V>> fut : pending()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; @@ -266,7 +267,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut } /** {@inheritDoc} */ - @Override public boolean onDone(GridCacheTxEx<K, V> t, Throwable err) { + @Override public boolean onDone(IgniteTxEx<K, V> t, Throwable err) { // If locks were not acquired yet, delay completion. if (isDone() || (err == null && !checkLocks())) return false; @@ -434,7 +435,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut try { prepare( - tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<GridCacheTxEntry<K, V>>emptyList(), + tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry<K, V>>emptyList(), tx.writeEntries()); markInitialized(); @@ -450,8 +451,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node. */ private void prepare( - Iterable<GridCacheTxEntry<K, V>> reads, - Iterable<GridCacheTxEntry<K, V>> writes + Iterable<IgniteTxEntry<K, V>> reads, + Iterable<IgniteTxEntry<K, V>> writes ) throws IgniteCheckedException { assert tx.optimistic(); @@ -482,7 +483,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut // Assign keys to primary nodes. GridDistributedTxMapping<K, V> cur = null; - for (GridCacheTxEntry<K, V> read : reads) { + for (IgniteTxEntry<K, V> read : reads) { GridDistributedTxMapping<K, V> updated = map(read, topVer, cur); if (cur != updated) { @@ -499,7 +500,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut } } - for (GridCacheTxEntry<K, V> write : writes) { + for (IgniteTxEntry<K, V> write : writes) { GridDistributedTxMapping<K, V> updated = map(write, topVer, cur); if (cur != updated) { @@ -565,7 +566,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut tx.subjectId(), tx.taskNameHash()); - for (GridCacheTxEntry<K, V> txEntry : m.writes()) { + for (IgniteTxEntry<K, V> txEntry : m.writes()) { if (txEntry.op() == TRANSFORM) req.addDhtVersion(txEntry.txKey(), null); } @@ -589,21 +590,21 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut // At this point, if any new node joined, then it is // waiting for this transaction to complete, so // partition reassignments are not possible here. - IgniteFuture<GridCacheTxEx<K, V>> fut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); + IgniteFuture<IgniteTxEx<K, V>> fut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); // Add new future. add(new GridEmbeddedFuture<>( cctx.kernalContext(), fut, - new C2<GridCacheTxEx<K, V>, Exception, GridCacheTxEx<K, V>>() { - @Override public GridCacheTxEx<K, V> apply(GridCacheTxEx<K, V> t, Exception ex) { + new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() { + @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> t, Exception ex) { if (ex != null) { onError(n.id(), mappings, ex); return t; } - GridCacheTxLocalEx<K, V> dhtTx = (GridCacheTxLocalEx<K, V>)t; + IgniteTxLocalEx<K, V> dhtTx = (IgniteTxLocalEx<K, V>)t; Collection<Integer> invalidParts = dhtTx.invalidPartitions(); @@ -616,7 +617,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut GridCacheVersion min = dhtTx.minVersion(); - GridCacheTxManager<K, V> tm = cctx.tm(); + IgniteTxManager<K, V> tm = cctx.tm(); tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(), tm.committedVersions(min), tm.rolledbackVersions(min)); @@ -657,7 +658,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key. * @return Mapping. */ - private GridDistributedTxMapping<K, V> map(GridCacheTxEntry<K, V> entry, long topVer, + private GridDistributedTxMapping<K, V> map(IgniteTxEntry<K, V> entry, long topVer, GridDistributedTxMapping<K, V> cur) throws IgniteCheckedException { GridCacheContext<K, V> cacheCtx = entry.context(); @@ -725,7 +726,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut * Mini-future for get operations. Mini-futures are only waiting on a single * node as opposed to multiple nodes. */ - private class MiniFuture extends GridFutureAdapter<GridCacheTxEx<K, V>> { + private class MiniFuture extends GridFutureAdapter<IgniteTxEx<K, V>> { /** */ private static final long serialVersionUID = 0L; @@ -831,8 +832,8 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut else { assert F.isEmpty(res.invalidPartitions()); - for (Map.Entry<GridCacheTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> entry : res.ownedValues().entrySet()) { - GridCacheTxEntry<K, V> txEntry = tx.entry(entry.getKey()); + for (Map.Entry<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> entry : res.ownedValues().entrySet()) { + IgniteTxEntry<K, V> txEntry = tx.entry(entry.getKey()); assert txEntry != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 431e134..46e1b8a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -13,6 +13,7 @@ import org.apache.ignite.lang.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.direct.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -79,10 +80,10 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ public GridNearTxPrepareRequest( IgniteUuid futId, long topVer, - GridCacheTxEx<K, V> tx, - Collection<GridCacheTxEntry<K, V>> reads, - Collection<GridCacheTxEntry<K, V>> writes, - GridCacheTxKey grpLockKey, + IgniteTxEx<K, V> tx, + Collection<IgniteTxEntry<K, V>> reads, + Collection<IgniteTxEntry<K, V>> writes, + IgniteTxKey grpLockKey, boolean partLock, boolean near, Map<UUID, Collection<UUID>> txNodes, @@ -182,13 +183,13 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ * @param c Collection of entries to clone. * @return Cloned collection. */ - private Collection<GridCacheTxEntry<K, V>> cloneEntries(Collection<GridCacheTxEntry<K, V>> c) { + private Collection<IgniteTxEntry<K, V>> cloneEntries(Collection<IgniteTxEntry<K, V>> c) { if (F.isEmpty(c)) return c; - Collection<GridCacheTxEntry<K, V>> cp = new ArrayList<>(c.size()); + Collection<IgniteTxEntry<K, V>> cp = new ArrayList<>(c.size()); - for (GridCacheTxEntry<K, V> e : c) { + for (IgniteTxEntry<K, V> e : c) { GridCacheContext<K, V> cacheCtx = e.context(); // Clone only if it is a near cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 08d7967..e453388 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -14,6 +14,7 @@ import org.apache.ignite.lang.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.direct.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.tostring.*; @@ -53,7 +54,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes /** Map of owned values to set on near node. */ @GridToStringInclude @GridDirectTransient - private Map<GridCacheTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedVals; + private Map<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedVals; /** Marshalled owned bytes. */ @GridToStringExclude @@ -136,7 +137,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes * @param val Value. * @param valBytes Value bytes. */ - public void addOwnedValue(GridCacheTxKey<K> key, GridCacheVersion ver, V val, byte[] valBytes) { + public void addOwnedValue(IgniteTxKey<K> key, GridCacheVersion ver, V val, byte[] valBytes) { if (ownedVals == null) ownedVals = new HashMap<>(); @@ -146,8 +147,8 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes /** * @return Owned values map. */ - public Map<GridCacheTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedValues() { - return ownedVals == null ? Collections.<GridCacheTxKey<K>, GridTuple3<GridCacheVersion,V,byte[]>>emptyMap() : + public Map<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedValues() { + return ownedVals == null ? Collections.<IgniteTxKey<K>, GridTuple3<GridCacheVersion,V,byte[]>>emptyMap() : Collections.unmodifiableMap(ownedVals); } @@ -155,7 +156,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes * @param key Key. * @return {@code True} if response has owned value for given key. */ - public boolean hasOwnedValue(GridCacheTxKey<K> key) { + public boolean hasOwnedValue(IgniteTxKey<K> key) { return ownedVals != null && ownedVals.containsKey(key); } @@ -174,7 +175,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes if (ownedVals != null && ownedValsBytes == null) { ownedValsBytes = new ArrayList<>(ownedVals.size()); - for (Map.Entry<GridCacheTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> entry : ownedVals.entrySet()) { + for (Map.Entry<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> entry : ownedVals.entrySet()) { GridTuple3<GridCacheVersion, V, byte[]> tup = entry.getValue(); boolean rawBytes = false; @@ -204,7 +205,7 @@ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareRes ownedVals = new HashMap<>(); for (byte[] bytes : ownedValsBytes) { - GridTuple4<GridCacheTxKey<K>, GridCacheVersion, byte[], Boolean> tup = ctx.marshaller().unmarshal(bytes, ldr); + GridTuple4<IgniteTxKey<K>, GridCacheVersion, byte[], Boolean> tup = ctx.marshaller().unmarshal(bytes, ldr); V val = tup.get4() ? (V)tup.get3() : ctx.marshaller().<V>unmarshal(tup.get3(), ldr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java index 99d17eb..fbec6dd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java @@ -13,6 +13,7 @@ import org.apache.ignite.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.*; @@ -32,7 +33,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> private static final long serialVersionUID = 0L; /** Evicted keys. */ - private Collection<GridCacheTxKey<K>> evicted = new LinkedList<>(); + private Collection<IgniteTxKey<K>> evicted = new LinkedList<>(); /** Near node ID. */ private UUID nearNodeId; @@ -41,7 +42,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> private GridCacheVersion nearXidVer; /** Owned versions. */ - private Map<GridCacheTxKey<K>, GridCacheVersion> owned; + private Map<IgniteTxKey<K>, GridCacheVersion> owned; /** Group lock flag. */ private boolean grpLock; @@ -86,9 +87,9 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> IgniteTxIsolation isolation, boolean invalidate, long timeout, - Collection<GridCacheTxEntry<K, V>> writeEntries, + Collection<IgniteTxEntry<K, V>> writeEntries, int txSize, - @Nullable GridCacheTxKey grpLockKey, + @Nullable IgniteTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) throws IgniteCheckedException { @@ -105,7 +106,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> writeEntries != null ? Math.max(txSize, writeEntries.size()) : txSize, 1.0f); if (writeEntries != null) - for (GridCacheTxEntry<K, V> entry : writeEntries) { + for (IgniteTxEntry<K, V> entry : writeEntries) { entry.unmarshal(ctx, true, ldr); addEntry(entry); @@ -144,7 +145,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> boolean invalidate, long timeout, int txSize, - @Nullable GridCacheTxKey grpLockKey, + @Nullable IgniteTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) { @@ -176,7 +177,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> } /** {@inheritDoc} */ - @Override public GridCacheVersion ownedVersion(GridCacheTxKey<K> key) { + @Override public GridCacheVersion ownedVersion(IgniteTxKey<K> key) { return owned == null ? null : owned.get(key); } @@ -205,7 +206,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * * @param vers Map of owned versions. */ - public void ownedVersions(Map<GridCacheTxKey<K>, GridCacheVersion> vers) { + public void ownedVersions(Map<IgniteTxKey<K>, GridCacheVersion> vers) { if (F.isEmpty(vers)) return; @@ -230,7 +231,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> /** * @return Evicted keys. */ - public Collection<GridCacheTxKey<K>> evicted() { + public Collection<IgniteTxKey<K>> evicted() { return evicted; } @@ -239,7 +240,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * * @param key Evicted key. */ - public void addEvicted(GridCacheTxKey<K> key) { + public void addEvicted(IgniteTxKey<K> key) { evicted.add(key); } @@ -250,8 +251,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param entries Entries to add. * @throws IgniteCheckedException If failed. */ - public void addEntries(ClassLoader ldr, Iterable<GridCacheTxEntry<K, V>> entries) throws IgniteCheckedException { - for (GridCacheTxEntry<K, V> entry : entries) { + public void addEntries(ClassLoader ldr, Iterable<IgniteTxEntry<K, V>> entries) throws IgniteCheckedException { + for (IgniteTxEntry<K, V> entry : entries) { entry.unmarshal(cctx, true, ldr); addEntry(entry); @@ -263,7 +264,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @throws IgniteCheckedException If failed. * @return {@code True} if entry was enlisted. */ - private boolean addEntry(GridCacheTxEntry<K, V> entry) throws IgniteCheckedException { + private boolean addEntry(IgniteTxEntry<K, V> entry) throws IgniteCheckedException { checkInternal(entry.txKey()); GridCacheContext<K, V> cacheCtx = entry.context(); @@ -320,7 +321,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> */ public boolean addEntry( GridCacheContext<K, V> cacheCtx, - GridCacheTxKey<K> key, + IgniteTxKey<K> key, byte[] keyBytes, GridCacheOperation op, V val, @@ -348,7 +349,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> return false; } else { - GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, op, val, 0L, -1L, cached, + IgniteTxEntry<K, V> txEntry = new IgniteTxEntry<>(cacheCtx, this, op, val, 0L, -1L, cached, drVer); txEntry.keyBytes(keyBytes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java index 439a575..8915fc1 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/GridCacheDrManager.java @@ -11,6 +11,7 @@ package org.gridgain.grid.kernal.processors.cache.dr; import org.apache.ignite.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.kernal.processors.dr.*; import org.jetbrains.annotations.*; @@ -64,7 +65,7 @@ public interface GridCacheDrManager<K, V> extends GridCacheManager<K, V> { */ public GridDrResolveResult<V> resolveTx( GridCacheEntryEx<K, V> e, - GridCacheTxEntry<K, V> txEntry, + IgniteTxEntry<K, V> txEntry, GridCacheVersion newVer, GridCacheOperation op, V newVal, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java index cb3f407..702dd33 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/dr/os/GridOsCacheDrManager.java @@ -12,6 +12,7 @@ package org.gridgain.grid.kernal.processors.cache.dr.os; import org.apache.ignite.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.dr.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.kernal.processors.dr.*; import org.jetbrains.annotations.*; @@ -80,7 +81,7 @@ public class GridOsCacheDrManager<K, V> implements GridCacheDrManager<K, V> { /** {@inheritDoc} */ @Override public GridDrResolveResult<V> resolveTx(GridCacheEntryEx<K, V> e, - GridCacheTxEntry<K, V> txEntry, + IgniteTxEntry<K, V> txEntry, GridCacheVersion newVer, GridCacheOperation op, V newVal, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java index 66fffae..9ea91b9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCache.java @@ -14,6 +14,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.typedef.*; import org.jetbrains.annotations.*; @@ -86,7 +87,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys, long timeout, - GridCacheTxLocalEx<K, V> tx, boolean isRead, + IgniteTxLocalEx<K, V> tx, boolean isRead, boolean retval, IgniteTxIsolation isolation, boolean invalidate, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { return lockAllAsync(keys, timeout, tx, filter); @@ -95,7 +96,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - GridCacheTxLocalEx<K, V> tx = ctx.tm().localTx(); + IgniteTxLocalEx<K, V> tx = ctx.tm().localTx(); return lockAllAsync(keys, timeout, tx, filter); } @@ -108,7 +109,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { * @return Future. */ public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - @Nullable GridCacheTxLocalEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { + @Nullable IgniteTxLocalEx<K, V> tx, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java index f5ff9aa..58b4cf0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalCacheEntry.java @@ -10,6 +10,7 @@ package org.gridgain.grid.kernal.processors.cache.local; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.jetbrains.annotations.*; import static org.apache.ignite.events.IgniteEventType.*; @@ -170,7 +171,7 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> { } /** {@inheritDoc} */ - @Override public boolean tmLock(GridCacheTxEx<K, V> tx, long timeout) throws GridCacheEntryRemovedException { + @Override public boolean tmLock(IgniteTxEx<K, V> tx, long timeout) throws GridCacheEntryRemovedException { GridCacheMvccCandidate<K> cand = addLocal( tx.threadId(), tx.xidVersion(), @@ -266,7 +267,7 @@ public class GridLocalCacheEntry<K, V> extends GridCacheMapEntry<K, V> { * * @param tx Transaction to unlock. */ - @Override public void txUnlock(GridCacheTxEx<K, V> tx) throws GridCacheEntryRemovedException { + @Override public void txUnlock(IgniteTxEx<K, V> tx) throws GridCacheEntryRemovedException { removeLock(tx.xidVersion()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java index 09bb220..25a13c5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalLockFuture.java @@ -12,9 +12,9 @@ package org.gridgain.grid.kernal.processors.cache.local; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.kernal.processors.timeout.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.future.*; @@ -76,7 +76,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> private IgnitePredicate<GridCacheEntry<K, V>>[] filter; /** Transaction. */ - private GridCacheTxLocalEx<K, V> tx; + private IgniteTxLocalEx<K, V> tx; /** Trackable flag. */ private boolean trackable = true; @@ -99,7 +99,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> GridLocalLockFuture( GridCacheContext<K, V> cctx, Collection<? extends K> keys, - GridCacheTxLocalEx<K, V> tx, + IgniteTxLocalEx<K, V> tx, GridLocalCache<K, V> cache, long timeout, IgnitePredicate<GridCacheEntry<K, V>>[] filter) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java index 8ceb402..0226ff2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java @@ -13,6 +13,7 @@ import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.tostring.*; import org.jetbrains.annotations.*; @@ -26,7 +27,7 @@ import static org.apache.ignite.transactions.IgniteTxState.*; /** * Local cache transaction. */ -class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> { +class GridLocalTx<K, V> extends IgniteTxLocalAdapter<K, V> { /** */ private static final long serialVersionUID = 0L; @@ -65,7 +66,7 @@ class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheTxEx<K, V>> future() { + @Override public IgniteFuture<IgniteTxEx<K, V>> future() { return fut.get(); } @@ -103,11 +104,11 @@ class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> { } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheTxEx<K, V>> prepareAsync() { + @Override public IgniteFuture<IgniteTxEx<K, V>> prepareAsync() { try { prepare(); - return new GridFinishedFuture<GridCacheTxEx<K, V>>(cctx.kernalContext(), this); + return new GridFinishedFuture<IgniteTxEx<K, V>>(cctx.kernalContext(), this); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(cctx.kernalContext(), e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java index e588cba..ff1682d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTxFuture.java @@ -14,6 +14,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.tostring.*; @@ -27,8 +28,8 @@ import static org.apache.ignite.transactions.IgniteTxState.*; /** * Replicated cache transaction future. */ -final class GridLocalTxFuture<K, V> extends GridFutureAdapter<GridCacheTxEx<K, V>> - implements GridCacheMvccFuture<K, V, GridCacheTxEx<K, V>> { +final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteTxEx<K, V>> + implements GridCacheMvccFuture<K, V, IgniteTxEx<K, V>> { /** */ private static final long serialVersionUID = 0L; @@ -177,7 +178,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<GridCacheTxEx<K, V */ @SuppressWarnings({"ThrowableInstanceNeverThrown"}) void checkLocks() { - for (GridCacheTxEntry<K, V> txEntry : tx.writeMap().values()) { + for (IgniteTxEntry<K, V> txEntry : tx.writeMap().values()) { while (true) { try { GridCacheEntryEx<K, V> entry = txEntry.cached(); @@ -226,7 +227,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<GridCacheTxEx<K, V if (log.isDebugEnabled()) log.debug("Transaction future received owner changed callback [owner=" + owner + ", entry=" + entry + ']'); - for (GridCacheTxEntry<K, V> txEntry : tx.writeMap().values()) { + for (IgniteTxEntry<K, V> txEntry : tx.writeMap().values()) { while (true) { try { GridCacheEntryEx<K,V> cached = txEntry.cached(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java index 3a63fa1..be53cb3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -16,6 +16,7 @@ import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.local.*; +import org.gridgain.grid.kernal.processors.cache.transactions.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.typedef.*; @@ -1248,7 +1249,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys, long timeout, - GridCacheTxLocalEx<K, V> tx, + IgniteTxLocalEx<K, V> tx, boolean isRead, boolean retval, IgniteTxIsolation isolation, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java index 7e132a4..2f36aa8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -116,7 +116,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " + "'txSerializableEnabled' configuration property)"); - GridCacheTxEx<K, V> tx = (GridCacheTxEx<K, V>)cctx.tm().userTx(); + IgniteTxEx<K, V> tx = (IgniteTxEx<K, V>)cctx.tm().userTx(); if (tx != null) throw new IllegalStateException("Failed to start new transaction " + @@ -194,7 +194,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { throw new IllegalStateException("Failed to start new transaction " + "(current thread already has a transaction): " + tx); - GridCacheTxLocalAdapter<K, V> tx0 = cctx.tm().newTx( + IgniteTxLocalAdapter<K, V> tx0 = cctx.tm().newTx( false, false, sys, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java new file mode 100644 index 0000000..a9d28f4 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxAdapter.java @@ -0,0 +1,1523 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.transactions; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.future.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.transactions.IgniteTxState.*; +import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; + +/** + * Managed transaction adapter. + */ +public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter + implements IgniteTxEx<K, V>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Static logger to avoid re-creation. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Transaction ID. */ + @GridToStringInclude + protected GridCacheVersion xidVer; + + /** Entries write version. */ + @GridToStringInclude + protected GridCacheVersion writeVer; + + /** Implicit flag. */ + @GridToStringInclude + protected boolean implicit; + + /** Implicit with one key flag. */ + @GridToStringInclude + protected boolean implicitSingle; + + /** Local flag. */ + @GridToStringInclude + protected boolean loc; + + /** Thread ID. */ + @GridToStringInclude + protected long threadId; + + /** Transaction start time. */ + @GridToStringInclude + protected long startTime = U.currentTimeMillis(); + + /** Node ID. */ + @GridToStringInclude + protected UUID nodeId; + + /** Transaction counter value at the start of transaction. */ + @GridToStringInclude + protected GridCacheVersion startVer; + + /** Cache registry. */ + @GridToStringExclude + protected GridCacheSharedContext<K, V> cctx; + + /** + * End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) + * assigned to this transaction at the end of write phase. + */ + @GridToStringInclude + protected GridCacheVersion endVer; + + /** Isolation. */ + @GridToStringInclude + protected IgniteTxIsolation isolation = READ_COMMITTED; + + /** Concurrency. */ + @GridToStringInclude + protected IgniteTxConcurrency concurrency = PESSIMISTIC; + + /** Transaction timeout. */ + @GridToStringInclude + protected long timeout; + + /** Invalidate flag. */ + protected volatile boolean invalidate; + + /** Invalidation flag for system invalidations (not user-based ones). */ + private boolean sysInvalidate; + + /** Internal flag. */ + protected boolean internal; + + /** System transaction flag. */ + private boolean sys; + + /** */ + protected boolean onePhaseCommit; + + /** */ + protected boolean syncCommit; + + /** */ + protected boolean syncRollback; + + /** If this transaction contains transform entries. */ + protected boolean transform; + + /** Commit version. */ + private AtomicReference<GridCacheVersion> commitVer = new AtomicReference<>(null); + + /** Done marker. */ + protected final AtomicBoolean isDone = new AtomicBoolean(false); + + /** */ + private AtomicReference<FinalizationStatus> finalizing = new AtomicReference<>(FinalizationStatus.NONE); + + /** Preparing flag. */ + private AtomicBoolean preparing = new AtomicBoolean(); + + /** */ + private Set<Integer> invalidParts = new GridLeanSet<>(); + + /** Recover writes. */ + private Collection<IgniteTxEntry<K, V>> recoveryWrites; + + /** + * Transaction state. Note that state is not protected, as we want to + * always use {@link #state()} and {@link #state(IgniteTxState)} + * methods. + */ + @GridToStringInclude + private volatile IgniteTxState state = ACTIVE; + + /** Timed out flag. */ + private volatile boolean timedOut; + + /** */ + protected int txSize; + + /** Group lock key, if any. */ + protected IgniteTxKey grpLockKey; + + /** */ + @GridToStringExclude + private AtomicReference<GridFutureAdapter<IgniteTx>> finFut = new AtomicReference<>(); + + /** Topology version. */ + private AtomicLong topVer = new AtomicLong(-1); + + /** Mutex. */ + private final Lock lock = new ReentrantLock(); + + /** Lock condition. */ + private final Condition cond = lock.newCondition(); + + /** Subject ID initiated this transaction. */ + protected UUID subjId; + + /** Task name hash code. */ + protected int taskNameHash; + + /** Task name. */ + protected String taskName; + + /** Store used flag. */ + protected boolean storeEnabled = true; + + /** + * Empty constructor required for {@link Externalizable}. + */ + protected IgniteTxAdapter() { + // No-op. + } + + /** + * @param cctx Cache registry. + * @param xidVer Transaction ID. + * @param implicit Implicit flag. + * @param implicitSingle Implicit with one key flag. + * @param loc Local flag. + * @param sys System transaction flag. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param timeout Timeout. + * @param txSize Transaction size. + * @param grpLockKey Group lock key if this is group-lock transaction. + */ + protected IgniteTxAdapter( + GridCacheSharedContext<K, V> cctx, + GridCacheVersion xidVer, + boolean implicit, + boolean implicitSingle, + boolean loc, + boolean sys, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + long timeout, + boolean invalidate, + boolean storeEnabled, + int txSize, + @Nullable IgniteTxKey grpLockKey, + @Nullable UUID subjId, + int taskNameHash + ) { + assert xidVer != null; + assert cctx != null; + + this.cctx = cctx; + this.xidVer = xidVer; + this.implicit = implicit; + this.implicitSingle = implicitSingle; + this.loc = loc; + this.sys = sys; + this.concurrency = concurrency; + this.isolation = isolation; + this.timeout = timeout; + this.invalidate = invalidate; + this.storeEnabled = storeEnabled; + this.txSize = txSize; + this.grpLockKey = grpLockKey; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + + startVer = cctx.versions().last(); + + nodeId = cctx.discovery().localNode().id(); + + threadId = Thread.currentThread().getId(); + + log = U.logger(cctx.kernalContext(), logRef, this); + } + + /** + * @param cctx Cache registry. + * @param nodeId Node ID. + * @param xidVer Transaction ID. + * @param startVer Start version mark. + * @param threadId Thread ID. + * @param sys System transaction flag. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param timeout Timeout. + * @param txSize Transaction size. + * @param grpLockKey Group lock key if this is group-lock transaction. + */ + protected IgniteTxAdapter( + GridCacheSharedContext<K, V> cctx, + UUID nodeId, + GridCacheVersion xidVer, + GridCacheVersion startVer, + long threadId, + boolean sys, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + long timeout, + int txSize, + @Nullable IgniteTxKey grpLockKey, + @Nullable UUID subjId, + int taskNameHash + ) { + this.cctx = cctx; + this.nodeId = nodeId; + this.threadId = threadId; + this.xidVer = xidVer; + this.startVer = startVer; + this.sys = sys; + this.concurrency = concurrency; + this.isolation = isolation; + this.timeout = timeout; + this.txSize = txSize; + this.grpLockKey = grpLockKey; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + + implicit = false; + implicitSingle = false; + loc = false; + + log = U.logger(cctx.kernalContext(), logRef, this); + } + + /** + * Acquires lock. + */ + @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) + protected final void lock() { + lock.lock(); + } + + /** + * Releases lock. + */ + protected final void unlock() { + lock.unlock(); + } + + /** + * Signals all waiters. + */ + protected final void signalAll() { + cond.signalAll(); + } + + /** + * Waits for signal. + * + * @throws InterruptedException If interrupted. + */ + protected final void awaitSignal() throws InterruptedException { + cond.await(); + } + + /** + * Checks whether near cache should be updated. + * + * @return Flag indicating whether near cache should be updated. + */ + protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) { + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry<K, V>> optimisticLockEntries() { + assert optimistic(); + + if (!groupLock()) + return writeEntries(); + else { + if (!F.isEmpty(invalidParts)) { + assert invalidParts.size() == 1 : "Only one partition expected for group lock transaction " + + "[tx=" + this + ", invalidParts=" + invalidParts + ']'; + assert groupLockEntry() == null : "Group lock key should be rejected " + + "[tx=" + this + ", groupLockEntry=" + groupLockEntry() + ']'; + assert F.isEmpty(writeMap()) : "All entries should be rejected for group lock transaction " + + "[tx=" + this + ", writes=" + writeMap() + ']'; + + return Collections.emptyList(); + } + + IgniteTxEntry<K, V> grpLockEntry = groupLockEntry(); + + assert grpLockEntry != null || (near() && !local()): + "Group lock entry was not enlisted into transaction [tx=" + this + + ", grpLockKey=" + groupLockKey() + ']'; + + return grpLockEntry == null ? + Collections.<IgniteTxEntry<K,V>>emptyList() : + Collections.singletonList(grpLockEntry); + } + } + + /** + * @param recoveryWrites Recover write entries. + */ + public void recoveryWrites(Collection<IgniteTxEntry<K, V>> recoveryWrites) { + this.recoveryWrites = recoveryWrites; + } + + /** + * @return Recover write entries. + */ + @Override public Collection<IgniteTxEntry<K, V>> recoveryWrites() { + return recoveryWrites; + } + + /** {@inheritDoc} */ + @Override public boolean storeEnabled() { + return storeEnabled; + } + + /** + * @param storeEnabled Store enabled flag. + */ + public void storeEnabled(boolean storeEnabled) { + this.storeEnabled = storeEnabled; + } + + /** {@inheritDoc} */ + @Override public boolean system() { + return sys; + } + + /** {@inheritDoc} */ + @Override public boolean storeUsed() { + return storeEnabled() && store() != null; + } + + /** + * Store manager for current transaction. + * + * @return Store manager. + */ + protected GridCacheStoreManager<K, V> store() { + if (!activeCacheIds().isEmpty()) { + int cacheId = F.first(activeCacheIds()); + + GridCacheStoreManager<K, V> store = cctx.cacheContext(cacheId).store(); + + return store.configured() ? store : null; + } + + return null; + } + + /** + * This method uses unchecked assignment to cast group lock key entry to transaction generic signature. + * + * @return Group lock tx entry. + */ + @SuppressWarnings("unchecked") + public IgniteTxEntry<K, V> groupLockEntry() { + return ((IgniteTxAdapter)this).entry(groupLockKey()); + } + + /** {@inheritDoc} */ + @Override public UUID otherNodeId() { + return null; + } + + /** {@inheritDoc} */ + @Override public UUID subjectId() { + if (subjId != null) + return subjId; + + return originatingNodeId(); + } + + /** {@inheritDoc} */ + @Override public int taskNameHash() { + return taskNameHash; + } + + /** {@inheritDoc} */ + @Override public long topologyVersion() { + long res = topVer.get(); + + if (res == -1) + return cctx.exchange().topologyVersion(); + + return res; + } + + /** {@inheritDoc} */ + @Override public long topologyVersion(long topVer) { + this.topVer.compareAndSet(-1, topVer); + + return this.topVer.get(); + } + + /** {@inheritDoc} */ + @Override public boolean hasTransforms() { + return transform; + } + + /** {@inheritDoc} */ + @Override public boolean markPreparing() { + return preparing.compareAndSet(false, true); + } + + /** + * @return {@code True} if marked. + */ + @Override public boolean markFinalizing(FinalizationStatus status) { + boolean res; + + switch (status) { + case USER_FINISH: + res = finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.USER_FINISH); + + break; + + case RECOVERY_WAIT: + finalizing.compareAndSet(FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT); + + FinalizationStatus cur = finalizing.get(); + + res = cur == FinalizationStatus.RECOVERY_WAIT || cur == FinalizationStatus.RECOVERY_FINISH; + + break; + + case RECOVERY_FINISH: + FinalizationStatus old = finalizing.get(); + + res = old != FinalizationStatus.USER_FINISH && finalizing.compareAndSet(old, status); + + break; + + default: + throw new IllegalArgumentException("Cannot set finalization status: " + status); + + } + + if (res) { + if (log.isDebugEnabled()) + log.debug("Marked transaction as finalized: " + this); + } + else { + if (log.isDebugEnabled()) + log.debug("Transaction was not marked finalized: " + this); + } + + return res; + } + + /** + * @return Finalization status. + */ + protected FinalizationStatus finalizationStatus() { + return finalizing.get(); + } + + /** + * @return {@code True} if transaction has at least one key enlisted. + */ + public abstract boolean isStarted(); + + /** {@inheritDoc} */ + @Override public boolean groupLock() { + return grpLockKey != null; + } + + /** {@inheritDoc} */ + @Override public IgniteTxKey groupLockKey() { + return grpLockKey; + } + + /** {@inheritDoc} */ + @Override public int size() { + return txSize; + } + + /** + * @return Logger. + */ + protected IgniteLogger log() { + return log; + } + + /** {@inheritDoc} */ + @Override public boolean near() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean implicit() { + return implicit; + } + + /** {@inheritDoc} */ + @Override public boolean implicitSingle() { + return implicitSingle; + } + + /** {@inheritDoc} */ + @Override public boolean local() { + return loc; + } + + /** {@inheritDoc} */ + @Override public final boolean user() { + return !implicit() && local() && !dht() && !internal(); + } + + /** {@inheritDoc} */ + @Override public boolean dht() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean colocated() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean replicated() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean enforceSerializable() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean syncCommit() { + return syncCommit; + } + + /** {@inheritDoc} */ + @Override public boolean syncRollback() { + return syncRollback; + } + + /** + * @param syncCommit Synchronous commit flag. + */ + public void syncCommit(boolean syncCommit) { + this.syncCommit = syncCommit; + } + + /** + * @param syncRollback Synchronous rollback flag. + */ + public void syncRollback(boolean syncRollback) { + this.syncRollback = syncRollback; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid xid() { + return xidVer.asGridUuid(); + } + + /** {@inheritDoc} */ + @Override public Set<Integer> invalidPartitions() { + return invalidParts; + } + + /** {@inheritDoc} */ + @Override public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part) { + invalidParts.add(part); + + if (log.isDebugEnabled()) + log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']'); + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion ownedVersion(IgniteTxKey<K> key) { + return null; + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return startTime; + } + + /** + * Gets remaining allowed transaction time. + * + * @return Remaining transaction time. + */ + @Override public long remainingTime() { + if (timeout() <= 0) + return -1; + + long timeLeft = timeout() - (U.currentTimeMillis() - startTime()); + + if (timeLeft < 0) + return 0; + + return timeLeft; + } + + /** + * @return Lock timeout. + */ + protected long lockTimeout() { + long timeout = remainingTime(); + + return timeout < 0 ? 0 : timeout == 0 ? -1 : timeout; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion xidVersion() { + return xidVer; + } + + /** {@inheritDoc} */ + @Override public long threadId() { + return threadId; + } + + /** {@inheritDoc} */ + @Override public UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public IgniteTxIsolation isolation() { + return isolation; + } + + /** {@inheritDoc} */ + @Override public IgniteTxConcurrency concurrency() { + return concurrency; + } + + /** {@inheritDoc} */ + @Override public long timeout() { + return timeout; + } + + /** {@inheritDoc} */ + @Override public long timeout(long timeout) { + if (isStarted()) + throw new IllegalStateException("Cannot change timeout after transaction has started: " + this); + + long old = this.timeout; + + this.timeout = timeout; + + return old; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean ownsLock(GridCacheEntryEx<K, V> entry) throws GridCacheEntryRemovedException { + GridCacheContext<K, V> cacheCtx = entry.context(); + + IgniteTxEntry<K, V> txEntry = entry(entry.txKey()); + + GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion(); + + assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " + + "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']'; + + return local() && !cacheCtx.isDht() ? + entry.lockedByThread(threadId()) || (explicit != null && entry.lockedBy(explicit)) : + // If candidate is not there, then lock was explicit. + // Otherwise, check if entry is owned by version. + !entry.hasLockCandidate(xidVersion()) || entry.lockedBy(xidVersion()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean ownsLockUnsafe(GridCacheEntryEx<K, V> entry) { + GridCacheContext<K, V> cacheCtx = entry.context(); + + IgniteTxEntry<K, V> txEntry = entry(entry.txKey()); + + GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion(); + + assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " + + "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']'; + + return local() && !cacheCtx.isDht() ? + entry.lockedByThreadUnsafe(threadId()) || (explicit != null && entry.lockedByUnsafe(explicit)) : + // If candidate is not there, then lock was explicit. + // Otherwise, check if entry is owned by version. + !entry.hasLockCandidateUnsafe(xidVersion()) || entry.lockedByUnsafe(xidVersion()); + } + + /** {@inheritDoc} */ + @Override public IgniteTxState state() { + return state; + } + + /** {@inheritDoc} */ + @Override public boolean setRollbackOnly() { + return state(MARKED_ROLLBACK); + } + + /** + * @return {@code True} if rollback only flag is set. + */ + @Override public boolean isRollbackOnly() { + return state == MARKED_ROLLBACK || state == ROLLING_BACK || state == ROLLED_BACK; + } + + /** {@inheritDoc} */ + @Override public boolean done() { + return isDone.get(); + } + + /** + * @return Commit version. + */ + @Override public GridCacheVersion commitVersion() { + initCommitVersion(); + + return commitVer.get(); + } + + /** + * @param commitVer Commit version. + * @return {@code True} if set to not null value. + */ + @Override public boolean commitVersion(GridCacheVersion commitVer) { + return commitVer != null && this.commitVer.compareAndSet(null, commitVer); + } + + /** + * + */ + public void initCommitVersion() { + if (commitVer.get() == null) + commitVer.compareAndSet(null, xidVer); + } + + /** + * + */ + @Override public void close() throws IgniteCheckedException { + IgniteTxState state = state(); + + if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) + rollback(); + + awaitCompletion(); + } + + /** {@inheritDoc} */ + @Override public boolean needsCompletedVersions() { + return false; + } + + /** {@inheritDoc} */ + @Override public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed, + Collection<GridCacheVersion> txs) { + /* No-op. */ + } + + /** + * Awaits transaction completion. + * + * @throws IgniteCheckedException If waiting failed. + */ + protected void awaitCompletion() throws IgniteCheckedException { + lock(); + + try { + while (!done()) + awaitSignal(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + if (!done()) + throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + this, e); + } + finally { + unlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean internal() { + return internal; + } + + /** + * @param key Key. + * @return {@code True} if key is internal. + */ + protected boolean checkInternal(IgniteTxKey<K> key) { + if (key.key() instanceof GridCacheInternal) { + internal = true; + + return true; + } + + return false; + } + + /** + * @param onePhaseCommit {@code True} if transaction commit should be performed in short-path way. + */ + public void onePhaseCommit(boolean onePhaseCommit) { + this.onePhaseCommit = onePhaseCommit; + } + + /** + * @return Fast commit flag. + */ + @Override public boolean onePhaseCommit() { + return onePhaseCommit; + } + + /** {@inheritDoc} */ + @Override public boolean optimistic() { + return concurrency == OPTIMISTIC; + } + + /** {@inheritDoc} */ + @Override public boolean pessimistic() { + return concurrency == PESSIMISTIC; + } + + /** {@inheritDoc} */ + @Override public boolean serializable() { + return isolation == SERIALIZABLE; + } + + /** {@inheritDoc} */ + @Override public boolean repeatableRead() { + return isolation == REPEATABLE_READ; + } + + /** {@inheritDoc} */ + @Override public boolean readCommitted() { + return isolation == READ_COMMITTED; + } + + /** {@inheritDoc} */ + @Override public boolean state(IgniteTxState state) { + return state(state, false); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + @Override public IgniteFuture<IgniteTx> finishFuture() { + GridFutureAdapter<IgniteTx> fut = finFut.get(); + + if (fut == null) { + fut = new GridFutureAdapter<IgniteTx>(cctx.kernalContext()) { + @Override public String toString() { + return S.toString(GridFutureAdapter.class, this, "tx", IgniteTxAdapter.this); + } + }; + + if (!finFut.compareAndSet(null, fut)) + fut = finFut.get(); + } + + assert fut != null; + + if (isDone.get()) + fut.onDone(this); + + return fut; + } + + /** + * + * @param state State to set. + * @param timedOut Timeout flag. + * @return {@code True} if state changed. + */ + @SuppressWarnings({"TooBroadScope"}) + private boolean state(IgniteTxState state, boolean timedOut) { + boolean valid = false; + + IgniteTxState prev; + + boolean notify = false; + + lock(); + + try { + prev = this.state; + + switch (state) { + case ACTIVE: { + valid = false; + + break; + } // Active is initial state and cannot be transitioned to. + case PREPARING: { + valid = prev == ACTIVE; + + break; + } + case PREPARED: { + valid = prev == PREPARING; + + break; + } + case COMMITTING: { + valid = prev == PREPARED; + + break; + } + + case UNKNOWN: { + if (isDone.compareAndSet(false, true)) + notify = true; + + valid = prev == ROLLING_BACK || prev == COMMITTING; + + break; + } + + case COMMITTED: { + if (isDone.compareAndSet(false, true)) + notify = true; + + valid = prev == COMMITTING; + + break; + } + + case ROLLED_BACK: { + if (isDone.compareAndSet(false, true)) + notify = true; + + valid = prev == ROLLING_BACK; + + break; + } + + case MARKED_ROLLBACK: { + valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == COMMITTING; + + break; + } + + case ROLLING_BACK: { + valid = + prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING || + prev == PREPARED || (prev == COMMITTING && local() && !dht()); + + break; + } + } + + if (valid) { + this.state = state; + this.timedOut = timedOut; + + if (log.isDebugEnabled()) + log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']'); + + // Notify of state change. + signalAll(); + } + else { + if (log.isDebugEnabled()) + log.debug("Invalid transaction state transition [invalid=" + state + ", cur=" + this.state + + ", tx=" + this + ']'); + } + } + finally { + unlock(); + } + + if (notify) { + GridFutureAdapter<IgniteTx> fut = finFut.get(); + + if (fut != null) + fut.onDone(this); + } + + if (valid) { + // Seal transactions maps. + if (state != ACTIVE) + seal(); + + cctx.tm().onTxStateChange(prev, state, this); + } + + return valid; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion startVersion() { + return startVer; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion endVersion() { + return endVer; + } + + /** {@inheritDoc} */ + @Override public void endVersion(GridCacheVersion endVer) { + this.endVer = endVer; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion writeVersion() { + return writeVer == null ? commitVersion() : writeVer; + } + + /** {@inheritDoc} */ + @Override public void writeVersion(GridCacheVersion writeVer) { + this.writeVer = writeVer; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return xidVer.asGridUuid(); + } + + /** {@inheritDoc} */ + @Override public long endTime() { + long endTime = timeout == 0 ? Long.MAX_VALUE : startTime + timeout; + + return endTime > 0 ? endTime : endTime < 0 ? Long.MAX_VALUE : endTime; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + state(MARKED_ROLLBACK, true); + } + + /** {@inheritDoc} */ + @Override public boolean timedOut() { + return timedOut; + } + + /** {@inheritDoc} */ + @Override public void invalidate(boolean invalidate) { + if (isStarted() && !dht()) + throw new IllegalStateException("Cannot change invalidation flag after transaction has started: " + this); + + this.invalidate = invalidate; + } + + /** {@inheritDoc} */ + @Override public boolean isInvalidate() { + return invalidate; + } + + /** {@inheritDoc} */ + @Override public boolean isSystemInvalidate() { + return sysInvalidate; + } + + /** {@inheritDoc} */ + @Override public void systemInvalidate(boolean sysInvalidate) { + this.sysInvalidate = sysInvalidate; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<UUID, Collection<UUID>> transactionNodes() { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheVersion nearXidVersion() { + return null; + } + + /** + * @param txEntry Entry to process. + * @param metrics {@code True} if metrics should be updated. + * @return Tuple containing transformation results. + * @throws IgniteCheckedException If failed to get previous value for transform. + * @throws GridCacheEntryRemovedException If entry was concurrently deleted. + */ + protected GridTuple3<GridCacheOperation, V, byte[]> applyTransformClosures(IgniteTxEntry<K, V> txEntry, + boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException { + GridCacheContext cacheCtx = txEntry.context(); + + assert cacheCtx != null; + + if (isSystemInvalidate()) + return F.t(cacheCtx.isStoreEnabled() ? RELOAD : DELETE, null, null); + if (F.isEmpty(txEntry.transformClosures())) + return F.t(txEntry.op(), txEntry.value(), txEntry.valueBytes()); + else { + try { + boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ); + + V val = txEntry.hasValue() ? txEntry.value() : + txEntry.cached().innerGet(this, + /*swap*/false, + /*read through*/false, + /*fail fast*/true, + /*unmarshal*/true, + /*metrics*/metrics, + /*event*/recordEvt, + /*temporary*/true, + /*subjId*/subjId, + /**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null, + resolveTaskName(), + CU.<K, V>empty()); + + try { + for (IgniteClosure<V, V> clos : txEntry.transformClosures()) + val = clos.apply(val); + } + catch (Throwable e) { + throw new IgniteException("Transform closure must not throw any exceptions " + + "(transaction will be invalidated)", e); + } + + GridCacheOperation op = val == null ? DELETE : UPDATE; + + return F.t(op, (V)cacheCtx.<V>unwrapTemporary(val), null); + } + catch (GridCacheFilterFailedException e) { + assert false : "Empty filter failed for innerGet: " + e; + + return null; + } + } + } + + /** + * @return Resolves task name. + */ + public String resolveTaskName() { + if (taskName != null) + return taskName; + + return (taskName = cctx.kernalContext().task().resolveTaskName(taskNameHash)); + } + + /** + * @param e Transaction entry. + * @param primaryOnly Flag to include backups into check or not. + * @return {@code True} if entry is locally mapped as a primary or back up node. + */ + protected boolean isNearLocallyMapped(IgniteTxEntry<K, V> e, boolean primaryOnly) { + GridCacheContext<K, V> cacheCtx = e.context(); + + if (!cacheCtx.isNear()) + return false; + + // Try to take either entry-recorded primary node ID, + // or transaction node ID from near-local transactions. + UUID nodeId = e.nodeId() == null ? local() ? this.nodeId : null : e.nodeId(); + + if (nodeId != null && nodeId.equals(cctx.localNodeId())) + return true; + + GridCacheEntryEx<K, V> cached = e.cached(); + + int part = cached != null ? cached.partition() : cacheCtx.affinity().partition(e.key()); + + List<ClusterNode> affNodes = cacheCtx.affinity().nodes(part, topologyVersion()); + + e.locallyMapped(F.contains(affNodes, cctx.localNode())); + + if (primaryOnly) { + ClusterNode primary = F.first(affNodes); + + if (primary == null && !isAffinityNode(cacheCtx.config())) + return false; + + assert primary != null : "Primary node is null for affinity nodes: " + affNodes; + + return primary.isLocal(); + } + else + return e.locallyMapped(); + } + + /** + * @param e Entry to evict if it qualifies for eviction. + * @param primaryOnly Flag to try to evict only on primary node. + * @return {@code True} if attempt was made to evict the entry. + * @throws IgniteCheckedException If failed. + */ + protected boolean evictNearEntry(IgniteTxEntry<K, V> e, boolean primaryOnly) throws IgniteCheckedException { + assert e != null; + + if (isNearLocallyMapped(e, primaryOnly)) { + GridCacheEntryEx<K, V> cached = e.cached(); + + assert cached instanceof GridNearCacheEntry : "Invalid cache entry: " + e; + + if (log.isDebugEnabled()) + log.debug("Evicting dht-local entry from near cache [entry=" + cached + ", tx=" + this + ']'); + + if (cached != null && cached.markObsolete(xidVer)) + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + writeExternalMeta(out); + + out.writeObject(xidVer); + out.writeBoolean(invalidate); + out.writeLong(timeout); + out.writeLong(threadId); + out.writeLong(startTime); + + U.writeUuid(out, nodeId); + + out.write(isolation.ordinal()); + out.write(concurrency.ordinal()); + out.write(state().ordinal()); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + readExternalMeta(in); + + xidVer = (GridCacheVersion)in.readObject(); + invalidate = in.readBoolean(); + timeout = in.readLong(); + threadId = in.readLong(); + startTime = in.readLong(); + + nodeId = U.readUuid(in); + + isolation = IgniteTxIsolation.fromOrdinal(in.read()); + concurrency = IgniteTxConcurrency.fromOrdinal(in.read()); + + state = IgniteTxState.fromOrdinal(in.read()); + } + + /** + * Reconstructs object on unmarshalling. + * + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. + */ + protected Object readResolve() throws ObjectStreamException { + return new TxShadow( + xidVer.asGridUuid(), + nodeId, + threadId, + startTime, + isolation, + concurrency, + invalidate, + implicit, + timeout, + state(), + isRollbackOnly() + ); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return o == this || (o instanceof IgniteTxAdapter && xidVer.equals(((IgniteTxAdapter)o).xidVer)); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return xidVer.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(IgniteTxAdapter.class, this, + "duration", (U.currentTimeMillis() - startTime) + "ms", "grpLock", groupLock(), + "onePhaseCommit", onePhaseCommit); + } + + /** + * Transaction shadow class to be used for deserialization. + */ + private static class TxShadow extends GridMetadataAwareAdapter implements IgniteTx { + /** */ + private static final long serialVersionUID = 0L; + + /** Xid. */ + private final IgniteUuid xid; + + /** Node ID. */ + private final UUID nodeId; + + /** Thread ID. */ + private final long threadId; + + /** Start time. */ + private final long startTime; + + /** Transaction isolation. */ + private final IgniteTxIsolation isolation; + + /** Concurrency. */ + private final IgniteTxConcurrency concurrency; + + /** Invalidate flag. */ + private final boolean invalidate; + + /** Timeout. */ + private final long timeout; + + /** State. */ + private final IgniteTxState state; + + /** Rollback only flag. */ + private final boolean rollbackOnly; + + /** Implicit flag. */ + private final boolean implicit; + + /** + * @param xid Xid. + * @param nodeId Node ID. + * @param threadId Thread ID. + * @param startTime Start time. + * @param isolation Isolation. + * @param concurrency Concurrency. + * @param invalidate Invalidate flag. + * @param implicit Implicit flag. + * @param timeout Transaction timeout. + * @param state Transaction state. + * @param rollbackOnly Rollback-only flag. + */ + TxShadow(IgniteUuid xid, UUID nodeId, long threadId, long startTime, IgniteTxIsolation isolation, + IgniteTxConcurrency concurrency, boolean invalidate, boolean implicit, long timeout, + IgniteTxState state, boolean rollbackOnly) { + this.xid = xid; + this.nodeId = nodeId; + this.threadId = threadId; + this.startTime = startTime; + this.isolation = isolation; + this.concurrency = concurrency; + this.invalidate = invalidate; + this.implicit = implicit; + this.timeout = timeout; + this.state = state; + this.rollbackOnly = rollbackOnly; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid xid() { + return xid; + } + + /** {@inheritDoc} */ + @Override public UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public long threadId() { + return threadId; + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return startTime; + } + + /** {@inheritDoc} */ + @Override public IgniteTxIsolation isolation() { + return isolation; + } + + /** {@inheritDoc} */ + @Override public IgniteTxConcurrency concurrency() { + return concurrency; + } + + /** {@inheritDoc} */ + @Override public boolean isInvalidate() { + return invalidate; + } + + /** {@inheritDoc} */ + @Override public boolean implicit() { + return implicit; + } + + /** {@inheritDoc} */ + @Override public long timeout() { + return timeout; + } + + /** {@inheritDoc} */ + @Override public IgniteTxState state() { + return state; + } + + /** {@inheritDoc} */ + @Override public boolean isRollbackOnly() { + return rollbackOnly; + } + + /** {@inheritDoc} */ + @Override public long timeout(long timeout) { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ + @Override public boolean setRollbackOnly() { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ + @Override public void commit() { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ + @Override public void close() { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<IgniteTx> commitAsync() { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ + @Override public void rollback() { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || o instanceof IgniteTx && xid.equals(((IgniteTx)o).xid()); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return xid.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TxShadow.class, this); + } + } +}