http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java index 2b974e9..df4dd58 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java @@ -81,6 +81,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { * @param ctx Cache registry. * @param implicit Implicit flag. * @param implicitSingle Implicit with one key flag. + * @param sys System flag. * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. @@ -92,6 +93,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { GridCacheSharedContext<K, V> ctx, boolean implicit, boolean implicitSingle, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout, @@ -104,10 +106,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { int taskNameHash ) { super( + ctx, ctx.versions().next(), implicit, implicitSingle, - ctx, + sys, concurrency, isolation, timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 5bc5a3e..9098b73 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -30,6 +30,7 @@ import java.util.*; import java.util.concurrent.atomic.*; import static org.gridgain.grid.cache.GridCacheTxState.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; /** @@ -640,7 +641,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut add(fut); // Append new future. try { - cctx.io().send(n, req); + cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (IgniteCheckedException e) { // Fail the whole thing. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 30b7aef..431e134 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -242,19 +242,19 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ } switch (commState.idx) { - case 21: + case 22: if (!commState.putGridUuid(futId)) return false; commState.idx++; - case 22: + case 23: if (!commState.putBoolean(last)) return false; commState.idx++; - case 23: + case 24: if (lastBackups != null) { if (commState.it == null) { if (!commState.putInt(lastBackups.size())) @@ -281,31 +281,31 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ commState.idx++; - case 24: + case 25: if (!commState.putGridUuid(miniId)) return false; commState.idx++; - case 25: + case 26: if (!commState.putBoolean(near)) return false; commState.idx++; - case 26: + case 27: if (!commState.putLong(topVer)) return false; commState.idx++; - case 27: + case 28: if (!commState.putUuid(subjId)) return false; commState.idx++; - case 28: + case 29: if (!commState.putInt(taskNameHash)) return false; @@ -325,7 +325,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ return false; switch (commState.idx) { - case 21: + case 22: IgniteUuid futId0 = commState.getGridUuid(); if (futId0 == GRID_UUID_NOT_READ) @@ -335,7 +335,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ commState.idx++; - case 22: + case 23: if (buf.remaining() < 1) return false; @@ -343,7 +343,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ commState.idx++; - case 23: + case 24: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -372,7 +372,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ commState.idx++; - case 24: + case 25: IgniteUuid miniId0 = commState.getGridUuid(); if (miniId0 == GRID_UUID_NOT_READ) @@ -382,7 +382,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ commState.idx++; - case 25: + case 26: if (buf.remaining() < 1) return false; @@ -390,7 +390,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ commState.idx++; - case 26: + case 27: if (buf.remaining() < 8) return false; @@ -398,7 +398,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ commState.idx++; - case 27: + case 28: UUID subjId0 = commState.getUuid(); if (subjId0 == UUID_NOT_READ) @@ -408,7 +408,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ commState.idx++; - case 28: + case 29: if (buf.remaining() < 4) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java index aa3546b..938e2d2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java @@ -62,6 +62,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param rmtThreadId Remote thread ID. * @param xidVer XID version. * @param commitVer Commit version. + * @param sys System flag. * @param concurrency Concurrency level (should be pessimistic). * @param isolation Transaction isolation. * @param invalidate Invalidate flag. @@ -73,24 +74,25 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @throws IgniteCheckedException If unmarshalling failed. */ public GridNearTxRemote( + GridCacheSharedContext<K, V> ctx, ClassLoader ldr, UUID nodeId, UUID nearNodeId, long rmtThreadId, GridCacheVersion xidVer, GridCacheVersion commitVer, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, boolean invalidate, long timeout, Collection<GridCacheTxEntry<K, V>> writeEntries, - GridCacheSharedContext<K, V> ctx, int txSize, @Nullable GridCacheTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) throws IgniteCheckedException { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize, + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, grpLockKey, subjId, taskNameHash); assert nearNodeId != null; @@ -119,6 +121,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param rmtThreadId Remote thread ID. * @param xidVer XID version. * @param commitVer Commit version. + * @param sys System flag. * @param concurrency Concurrency level (should be pessimistic). * @param isolation Transaction isolation. * @param invalidate Invalidate flag. @@ -128,23 +131,24 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param grpLockKey Collection of group lock keys if this is a group-lock transaction. */ public GridNearTxRemote( + GridCacheSharedContext<K, V> ctx, UUID nodeId, UUID nearNodeId, GridCacheVersion nearXidVer, long rmtThreadId, GridCacheVersion xidVer, GridCacheVersion commitVer, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, boolean invalidate, long timeout, - GridCacheSharedContext<K, V> ctx, int txSize, @Nullable GridCacheTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize, + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, grpLockKey, subjId, taskNameHash); assert nearNodeId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java index cebd888..aac60fd 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java @@ -60,8 +60,8 @@ class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> { @Nullable UUID subjId, int taskNameHash ) { - super(ctx, ctx.versions().next(), implicit, implicitSingle, concurrency, isolation, timeout, false, true, txSize, - null, false, subjId, taskNameHash); + super(ctx, ctx.versions().next(), implicit, implicitSingle, false, concurrency, isolation, timeout, false, true, + txSize, null, false, subjId, taskNameHash); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java index 60eec9e..aece7cf 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -12,6 +12,7 @@ package org.gridgain.grid.kernal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -25,7 +26,7 @@ import static org.gridgain.grid.cache.GridCacheTxIsolation.*; /** * Grid transactions implementation. */ -public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { +public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { /** Cache shared context. */ private GridCacheSharedContext<K, V> cctx; @@ -44,7 +45,9 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { cfg.getDefaultTxConcurrency(), cfg.getDefaultTxIsolation(), cfg.getDefaultTxTimeout(), - 0); + 0, + false + ); } /** {@inheritDoc} */ @@ -58,7 +61,8 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { concurrency, isolation, cfg.getDefaultTxTimeout(), - 0 + 0, + false ); } @@ -74,7 +78,24 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { concurrency, isolation, timeout, - txSize + txSize, + false + ); + } + + @Override public GridCacheTx txStartSystem(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, + long timeout, int txSize) { + A.notNull(concurrency, "concurrency"); + A.notNull(isolation, "isolation"); + A.ensure(timeout >= 0, "timeout cannot be negative"); + A.ensure(txSize >= 0, "transaction size cannot be negative"); + + return txStart0( + concurrency, + isolation, + timeout, + txSize, + true ); } @@ -83,10 +104,11 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { * @param isolation Transaction isolation. * @param timeout Transaction timeout. * @param txSize Expected transaction size. + * @param sys System flag. * @return Transaction. */ private GridCacheTx txStart0(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, - long timeout, int txSize) { + long timeout, int txSize, boolean sys) { GridTransactionsConfiguration cfg = cctx.gridConfig().getTransactionsConfiguration(); if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE) @@ -102,6 +124,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { tx = cctx.tm().newTx( false, false, + sys, concurrency, isolation, timeout, @@ -128,7 +151,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { throw new IllegalArgumentException("Failed to find cache with given name (cache is not configured): " + cacheName); - return txStartGroupLock(cache.context(), affinityKey, concurrency, isolation, false, timeout, txSize); + return txStartGroupLock(cache.context(), affinityKey, concurrency, isolation, false, timeout, txSize, false); } /** {@inheritDoc} */ @@ -142,7 +165,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { Object grpLockKey = cache.context().affinity().partitionAffinityKey(partId); - return txStartGroupLock(cache.context(), grpLockKey, concurrency, isolation, true, timeout, txSize); + return txStartGroupLock(cache.context(), grpLockKey, concurrency, isolation, true, timeout, txSize, false); } /** @@ -155,13 +178,14 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { * should be a unique partition-specific key. * @param timeout Tx timeout. * @param txSize Expected transaction size. + * @param sys System flag. * @return Started transaction. * @throws IllegalStateException If other transaction was already started. * @throws IgniteCheckedException In case of error. */ @SuppressWarnings("unchecked") private GridCacheTx txStartGroupLock(GridCacheContext ctx, Object grpLockKey, GridCacheTxConcurrency concurrency, - GridCacheTxIsolation isolation, boolean partLock, long timeout, int txSize) + GridCacheTxIsolation isolation, boolean partLock, long timeout, int txSize, boolean sys) throws IllegalStateException, IgniteCheckedException { GridCacheTx tx = cctx.tm().userTx(); @@ -172,6 +196,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions { GridCacheTxLocalAdapter<K, V> tx0 = cctx.tm().newTx( false, false, + sys, concurrency, isolation, timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java b/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java index 6044900..e1e24bb 100644 --- a/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java +++ b/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java @@ -64,16 +64,17 @@ public class GridCacheJtaManager<K, V> extends GridCacheJtaManagerAdapter<K, V> .getTransactionsConfiguration(); tx = cctx.tm().newTx( - false, - false, + /*implicit*/false, + /*implicit single*/false, + /*system*/false, tCfg.getDefaultTxConcurrency(), tCfg.getDefaultTxIsolation(), tCfg.getDefaultTxTimeout(), - false, - true, - 0, - /** group lock keys */null, - /** partition lock */false + /*invalidate*/false, + /*store enabled*/true, + /*tx size*/0, + /*group lock keys*/null, + /*partition lock*/false ); }