GG-9141 - Fixes for DR transactions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/88a2d8da Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/88a2d8da Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/88a2d8da Branch: refs/heads/ignite-41 Commit: 88a2d8da1ca94a6e217c796cc958dfbfc9393eca Parents: a70cfa2 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Wed Dec 17 18:57:14 2014 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Wed Dec 17 18:57:14 2014 -0800 ---------------------------------------------------------------------- .../gridgain/grid/kernal/GridKernalContext.java | 2 +- .../grid/kernal/IgniteTransactionsEx.java | 33 +++++++++++++ .../processors/cache/GridCacheAdapter.java | 2 + .../processors/cache/GridCacheContext.java | 26 ++++++---- .../processors/cache/GridCacheIoManager.java | 32 ++++++++----- .../processors/cache/GridCacheTxAdapter.java | 14 ++++++ .../kernal/processors/cache/GridCacheTxEx.java | 9 ++++ .../processors/cache/GridCacheTxHandler.java | 25 ++++++---- .../cache/GridCacheTxLocalAdapter.java | 4 +- .../processors/cache/GridCacheTxManager.java | 2 + .../kernal/processors/cache/GridCacheUtils.java | 13 +++-- .../GridDistributedTxFinishRequest.java | 26 ++++++++++ .../GridDistributedTxPrepareRequest.java | 24 ++++++++++ .../GridDistributedTxRemoteAdapter.java | 3 ++ .../distributed/dht/GridDhtLockFuture.java | 6 +-- .../dht/GridDhtTransactionalCacheAdapter.java | 11 +++-- .../distributed/dht/GridDhtTxFinishFuture.java | 7 ++- .../distributed/dht/GridDhtTxFinishRequest.java | 50 ++++++++++---------- .../cache/distributed/dht/GridDhtTxLocal.java | 9 ++-- .../distributed/dht/GridDhtTxLocalAdapter.java | 8 ++-- .../distributed/dht/GridDhtTxPrepareFuture.java | 9 ++-- .../dht/GridDhtTxPrepareRequest.java | 48 +++++++++---------- .../cache/distributed/dht/GridDhtTxRemote.java | 12 +++-- .../colocated/GridDhtColocatedLockFuture.java | 5 +- .../distributed/near/GridNearLockFuture.java | 5 +- .../near/GridNearTransactionalCache.java | 3 +- .../near/GridNearTxFinishFuture.java | 5 +- .../near/GridNearTxFinishRequest.java | 24 +++++----- .../cache/distributed/near/GridNearTxLocal.java | 5 +- .../near/GridNearTxPrepareFuture.java | 3 +- .../near/GridNearTxPrepareRequest.java | 32 ++++++------- .../distributed/near/GridNearTxRemote.java | 12 +++-- .../processors/cache/local/GridLocalTx.java | 4 +- .../transactions/IgniteTransactionsImpl.java | 41 ++++++++++++---- .../cache/jta/GridCacheJtaManager.java | 15 +++--- 35 files changed, 367 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java index 7b161a4..e1b9d92 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java @@ -483,7 +483,7 @@ public interface GridKernalContext extends GridMetadataAware, Iterable<GridCompo /** * @param name Plugin name. * @return Plugin provider instance. - * @throws org.apache.ignite.plugin.PluginNotFoundException If plugin provider for the given name was not found. + * @throws PluginNotFoundException If plugin provider for the given name was not found. */ public PluginProvider pluginProvider(String name) throws PluginNotFoundException; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java new file mode 100644 index 0000000..8666e66 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java @@ -0,0 +1,33 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal; + +import org.apache.ignite.*; +import org.gridgain.grid.cache.*; + +/** + * Extended interface to work with system transactions. + */ +public interface IgniteTransactionsEx extends IgniteTransactions { + /** + * Starts transaction with specified isolation, concurrency, timeout, invalidation flag, + * and number of participating entries. + * + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param timeout Timeout. + * @param txSize Number of entries participating in transaction (may be approximate). + * @return New transaction. + * @throws IllegalStateException If transaction is already started by this thread. + * @throws UnsupportedOperationException If cache is {@link GridCacheAtomicityMode#ATOMIC}. + */ + public GridCacheTx txStartSystem(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout, + int txSize); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 720ff36..b5bd597 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -3603,6 +3603,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im tx = ctx.tm().newTx( true, op.single(), + ctx.system(), PESSIMISTIC, READ_COMMITTED, tCfg.getDefaultTxTimeout(), @@ -3677,6 +3678,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im tx = ctx.tm().newTx( true, op.single(), + ctx.system(), PESSIMISTIC, READ_COMMITTED, ctx.kernalContext().config().getTransactionsConfiguration().getDefaultTxTimeout(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index 98766fb..a15713e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -169,6 +169,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Cache ID. */ private int cacheId; + /** System cache flag. */ + private boolean sys; + /** * Empty constructor required for {@link Externalizable}. */ @@ -274,6 +277,8 @@ public class GridCacheContext<K, V> implements Externalizable { } else cacheId = 1; + + sys = CU.UTILITY_CACHE_NAME.equals(cacheName); } /** @@ -309,6 +314,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return System cache flag. + */ + public boolean system() { + return sys; + } + + /** * @param cache Cache. */ public void cache(GridCacheAdapter<K, V> cache) { @@ -928,8 +940,7 @@ public class GridCacheContext<K, V> implements Externalizable { } /** - * Same as {@link GridFunc#isAll(Object, org.apache.ignite.lang.IgnitePredicate[])}, but safely unwraps - * exceptions. + * Same as {@link GridFunc#isAll(Object, IgnitePredicate[])}, but safely unwraps exceptions. * * @param e Element. * @param p Predicates. @@ -937,14 +948,13 @@ public class GridCacheContext<K, V> implements Externalizable { * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"ErrorNotRethrown"}) - public <K, V> boolean isAll(GridCacheEntryEx<K, V> e, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] p) throws IgniteCheckedException { + public <K1, V1> boolean isAll(GridCacheEntryEx<K1, V1> e, + @Nullable IgnitePredicate<GridCacheEntry<K1, V1>>[] p) throws IgniteCheckedException { return F.isEmpty(p) || isAll(e.wrap(false), p); } /** - * Same as {@link GridFunc#isAll(Object, org.apache.ignite.lang.IgnitePredicate[])}, but safely unwraps - * exceptions. + * Same as {@link GridFunc#isAll(Object, IgnitePredicate[])}, but safely unwraps exceptions. * * @param e Element. * @param p Predicates. @@ -1569,7 +1579,7 @@ public class GridCacheContext<K, V> implements Externalizable { /** * @param obj Object. * @return Portable object. - * @throws org.apache.ignite.portables.PortableException In case of error. + * @throws PortableException In case of error. */ @Nullable public Object marshalToPortable(@Nullable Object obj) throws PortableException { assert portableEnabled(); @@ -1634,7 +1644,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @param col List to unwrap. * @return Unwrapped list. */ - private ArrayList<Object> unwrapPortables(ArrayList<Object> col) { + private Collection<Object> unwrapPortables(ArrayList<Object> col) { int size = col.size(); for (int i = 0; i < size; i++) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java index a222c32..92cdb9a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java @@ -65,9 +65,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V /** Deployment enabled. */ private boolean depEnabled; - /** IO policy. */ - private GridIoPolicy plc; - /** Message listener. */ private GridMessageListener lsnr = new GridMessageListener() { @SuppressWarnings("unchecked") @@ -132,10 +129,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V retryDelay = cctx.gridConfig().getNetworkSendRetryDelay(); retryCnt = cctx.gridConfig().getNetworkSendRetryCount(); - //String cacheName = cctx.name(); TODO GG-9141 how to determine policy? - - plc = SYSTEM_POOL; // TODO GG-9141 CU.isDrSystemCache(cacheName) ? DR_POOL : SYSTEM_POOL; - depEnabled = cctx.gridDeploy().enabled(); cctx.gridIO().addMessageListener(TOPIC_CACHE, lsnr); @@ -333,7 +326,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V * @param node Node to send the message to. * @param msg Message to send. * @throws IgniteCheckedException If sending failed. - * @throws org.apache.ignite.cluster.ClusterTopologyException If receiver left. + * @throws ClusterTopologyException If receiver left. */ public void send(ClusterNode node, GridCacheMessage<K, V> msg) throws IgniteCheckedException { send(node, msg, SYSTEM_POOL); @@ -345,7 +338,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V * @param node Node to send the message to. * @param msg Message to send. * @throws IgniteCheckedException If sending failed. - * @throws org.apache.ignite.cluster.ClusterTopologyException If receiver left. + * @throws ClusterTopologyException If receiver left. */ public void send(ClusterNode node, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException { assert !node.isLocal(); @@ -444,7 +437,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V else msg0 = (GridCacheMessage<K, V>)msg.clone(); - cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, plc); + cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, SYSTEM_POOL); boolean added = false; @@ -537,6 +530,23 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V } /** + * Sends communication message. + * + * @param nodeId ID of node to send the message to. + * @param msg Message to send. + * @throws IgniteCheckedException If sending failed. + */ + public void send(UUID nodeId, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException { + ClusterNode n = cctx.discovery().node(nodeId); + + if (n == null) + throw new ClusterTopologyException("Failed to send message because node left grid [node=" + n + ", msg=" + + msg + ']'); + + send(n, msg, plc); + } + + /** * @param node Destination node. * @param topic Topic to send the message to. * @param msgId Ordered message ID. @@ -554,7 +564,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V try { cnt++; - cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, plc, timeout, false); + cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, SYSTEM_POOL, timeout, false); if (log.isDebugEnabled()) log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java index 7a32afa..ebd862b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java @@ -117,6 +117,9 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter /** Internal flag. */ protected boolean internal; + /** System transaction flag. */ + private boolean sys; + /** */ protected boolean onePhaseCommit; @@ -202,6 +205,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter * @param implicit Implicit flag. * @param implicitSingle Implicit with one key flag. * @param loc Local flag. + * @param sys System transaction flag. * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. @@ -214,6 +218,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter boolean implicit, boolean implicitSingle, boolean loc, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout, @@ -232,6 +237,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter this.implicit = implicit; this.implicitSingle = implicitSingle; this.loc = loc; + this.sys = sys; this.concurrency = concurrency; this.isolation = isolation; this.timeout = timeout; @@ -257,6 +263,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter * @param xidVer Transaction ID. * @param startVer Start version mark. * @param threadId Thread ID. + * @param sys System transaction flag. * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. @@ -269,6 +276,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter GridCacheVersion xidVer, GridCacheVersion startVer, long threadId, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout, @@ -282,6 +290,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter this.threadId = threadId; this.xidVer = xidVer; this.startVer = startVer; + this.sys = sys; this.concurrency = concurrency; this.isolation = isolation; this.timeout = timeout; @@ -394,6 +403,11 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Override public boolean system() { + return sys; + } + + /** {@inheritDoc} */ @Override public boolean storeUsed() { return storeEnabled() && store() != null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java index 877c0f1..0dda62b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java @@ -54,6 +54,15 @@ public interface GridCacheTxEx<K, V> extends GridCacheTx, GridTimeoutObject { public boolean storeUsed(); /** + * Checks if this is system cache transaction. System transactions are isolated from user transactions + * because some of the public API methods may be invoked inside user transactions and internally start + * system cache transactions. + * + * @return {@code True} if transaction is started for system cache. + */ + public boolean system(); + + /** * @return Last recorded topology version. */ public long topologyVersion(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java index fa85566..88def0e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java @@ -27,6 +27,7 @@ import java.util.*; import static org.gridgain.grid.cache.GridCacheTxConcurrency.*; import static org.gridgain.grid.cache.GridCacheTxIsolation.*; import static org.gridgain.grid.cache.GridCacheTxState.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; import static org.gridgain.grid.kernal.processors.cache.GridCacheTxEx.FinalizationStatus.*; import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; @@ -240,6 +241,7 @@ public class GridCacheTxHandler<K, V> { } else { tx = new GridDhtTxLocal<>( + ctx, nearNode.id(), req.version(), req.futureId(), @@ -247,7 +249,7 @@ public class GridCacheTxHandler<K, V> { req.threadId(), /*implicit*/false, /*implicit-single*/false, - ctx, + req.system(), req.concurrency(), req.isolation(), req.timeout(), @@ -468,7 +470,7 @@ public class GridCacheTxHandler<K, V> { req.miniId(), new IgniteCheckedException("Transaction has been already completed.")); try { - ctx.io().send(nodeId, res); + ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (Throwable e) { // Double-check. @@ -491,6 +493,7 @@ public class GridCacheTxHandler<K, V> { // Create transaction and add entries. tx = ctx.tm().onCreated( new GridDhtTxLocal<>( + ctx, nodeId, req.version(), req.futureId(), @@ -498,7 +501,7 @@ public class GridCacheTxHandler<K, V> { req.threadId(), true, false, /* we don't know, so assume false. */ - ctx, + req.system(), PESSIMISTIC, READ_COMMITTED, /*timeout */0, @@ -666,7 +669,7 @@ public class GridCacheTxHandler<K, V> { try { // Reply back to sender. - ctx.io().send(nodeId, res); + ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (IgniteCheckedException e) { if (e instanceof ClusterTopologyException) { @@ -864,7 +867,7 @@ public class GridCacheTxHandler<K, V> { GridCacheMessage<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId()); try { - ctx.io().send(nodeId, res); + ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (Throwable e) { // Double-check. @@ -895,6 +898,7 @@ public class GridCacheTxHandler<K, V> { if (tx == null) { tx = new GridDhtTxRemote<>( + ctx, req.nearNodeId(), req.futureId(), nodeId, @@ -902,11 +906,11 @@ public class GridCacheTxHandler<K, V> { req.topologyVersion(), req.version(), req.commitVersion(), + req.system(), req.concurrency(), req.isolation(), req.isInvalidate(), req.timeout(), - ctx, req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(), req.groupLockKey(), req.nearXidVersion(), @@ -1012,18 +1016,19 @@ public class GridCacheTxHandler<K, V> { if (tx == null) { tx = new GridNearTxRemote<>( + ctx, ldr, nodeId, req.nearNodeId(), req.threadId(), req.version(), req.commitVersion(), + req.system(), req.concurrency(), req.isolation(), req.isInvalidate(), req.timeout(), req.nearWrites(), - ctx, req.txSize(), req.groupLockKey(), req.subjectId(), @@ -1100,6 +1105,7 @@ public class GridCacheTxHandler<K, V> { if (tx == null) { tx = new GridDhtTxRemote<>( + ctx, req.nearNodeId(), req.futureId(), nodeId, @@ -1109,11 +1115,11 @@ public class GridCacheTxHandler<K, V> { req.topologyVersion(), req.version(), /*commitVer*/null, + req.system(), PESSIMISTIC, req.isolation(), req.isInvalidate(), 0, - ctx, req.txSize(), req.groupLockKey(), req.subjectId(), @@ -1241,6 +1247,7 @@ public class GridCacheTxHandler<K, V> { if (tx == null) { tx = new GridNearTxRemote<>( + ctx, nodeId, req.nearNodeId(), // We can pass null as nearXidVer as transaction will be committed right away. @@ -1248,11 +1255,11 @@ public class GridCacheTxHandler<K, V> { req.threadId(), req.version(), null, + req.system(), PESSIMISTIC, req.isolation(), req.isInvalidate(), 0, - ctx, req.txSize(), req.groupLockKey(), req.subjectId(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java index 59f0e97..384b243 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java @@ -94,6 +94,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K * @param implicit {@code True} if transaction was implicitly started by the system, * {@code false} if it was started explicitly by user. * @param implicitSingle {@code True} if transaction is implicit with only one key. + * @param sys System flag. * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. @@ -106,6 +107,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K GridCacheVersion xidVer, boolean implicit, boolean implicitSingle, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout, @@ -117,7 +119,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K @Nullable UUID subjId, int taskNameHash ) { - super(cctx, xidVer, implicit, implicitSingle, /*local*/true, concurrency, isolation, timeout, invalidate, + super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate, storeEnabled, txSize, grpLockKey, subjId, taskNameHash); assert !partLock || grpLockKey != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java index cc37438..cad83e5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java @@ -361,6 +361,7 @@ public class GridCacheTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V public GridCacheTxLocalAdapter<K, V> newTx( boolean implicit, boolean implicitSingle, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout, @@ -377,6 +378,7 @@ public class GridCacheTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V cctx, implicit, implicitSingle, + sys, concurrency, isolation, timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java index d28f728..acd81ef 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java @@ -14,7 +14,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; @@ -51,9 +50,6 @@ public class GridCacheUtils { /** Security system cache name. */ public static final String UTILITY_CACHE_NAME = "gg-sys-cache"; - /** Flag to turn off DHT cache for debugging purposes. */ - public static final boolean DHT_ENABLED = true; - /** Default mask name. */ private static final String DEFAULT_MASK_NAME = "<default>"; @@ -1522,6 +1518,15 @@ public class GridCacheUtils { } /** + * @return Cache ID for utility cache. + */ + public static int utilityCacheId() { + int hc = UTILITY_CACHE_NAME.hashCode(); + + return hc == 0 ? 1 : hc; + } + + /** * Validates that cache key or cache value implements {@link Externalizable} * * @param log Logger used to log warning message. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java index d2093c2..15d2ce3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -82,6 +82,9 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes /** Group lock key bytes. */ private byte[] grpLockKeyBytes; + /** System flag. */ + private boolean sys; + /** * Empty constructor required by {@link Externalizable}. */ @@ -96,6 +99,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes * @param commitVer Commit version. * @param commit Commit flag. * @param invalidate Invalidate flag. + * @param sys System flag. * @param baseVer Base version. * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. @@ -112,6 +116,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes long threadId, boolean commit, boolean invalidate, + boolean sys, boolean syncCommit, boolean syncRollback, GridCacheVersion baseVer, @@ -130,6 +135,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes this.threadId = threadId; this.commit = commit; this.invalidate = invalidate; + this.sys = sys; this.syncCommit = syncCommit; this.syncRollback = syncRollback; this.baseVer = baseVer; @@ -164,6 +170,13 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes } /** + * @return System flag. + */ + public boolean system() { + return sys; + } + + /** * @return Future ID. */ public IgniteUuid futureId() { @@ -350,6 +363,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes _clone.txSize = txSize; _clone.grpLockKey = grpLockKey; _clone.grpLockKeyBytes = grpLockKeyBytes; + _clone.sys = sys; } /** {@inheritDoc} */ @@ -482,6 +496,11 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes commState.idx++; + case 20: + if (!commState.putBoolean(sys)) + return false; + + commState.idx++; } return true; @@ -642,6 +661,13 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes commState.idx++; + case 20: + if (buf.remaining() < 1) + return false; + + sys = commState.getBoolean(); + + commState.idx++; } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 791c4f7..6c48627 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -103,6 +103,9 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe /** */ private byte[] txNodesBytes; + /** System flag. */ + private boolean sys; + /** * Required by {@link Externalizable}. */ @@ -135,6 +138,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe timeout = tx.timeout(); invalidate = tx.isInvalidate(); txSize = tx.size(); + sys = tx.system(); this.reads = reads; this.writes = writes; @@ -151,6 +155,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe } /** + * @return System flag. + */ + public boolean system() { + return sys; + } + + /** * Adds version to be verified on remote node. * * @param key Key for which version is verified. @@ -415,6 +426,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe _clone.txSize = txSize; _clone.txNodes = txNodes; _clone.txNodesBytes = txNodesBytes; + _clone.sys = sys; } /** {@inheritDoc} */ @@ -553,6 +565,11 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe commState.idx++; + case 21: + if (!commState.putBoolean(sys)) + return false; + + commState.idx++; } return true; @@ -725,6 +742,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe commState.idx++; + case 21: + if (buf.remaining() < 1) + return false; + + sys = commState.getBoolean(); + + commState.idx++; } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 3cd3e2d..f38d483 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -74,6 +74,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K, * @param rmtThreadId Remote thread ID. * @param xidVer XID version. * @param commitVer Commit version. + * @param sys System flag. * @param concurrency Concurrency level (should be pessimistic). * @param isolation Transaction isolation. * @param invalidate Invalidate flag. @@ -87,6 +88,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K, long rmtThreadId, GridCacheVersion xidVer, GridCacheVersion commitVer, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, boolean invalidate, @@ -102,6 +104,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K, xidVer, ctx.versions().last(), Thread.currentThread().getId(), + sys, concurrency, isolation, timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java index be4153a..0187e5f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; @@ -30,6 +29,7 @@ import java.util.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.events.IgniteEventType.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; import static org.gridgain.grid.kernal.processors.dr.GridDrType.*; /** @@ -862,7 +862,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo if (log.isDebugEnabled()) log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']'); - cctx.io().send(n, req); + cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (IgniteCheckedException e) { // Fail the whole thing. @@ -924,7 +924,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo log.debug("Sending DHT lock request to near node [node=" + n.id() + ", req=" + req + ']'); - cctx.io().send(n, req); + cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException e) { fut.onResult(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 3f30801..2113a68 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -29,6 +29,7 @@ import java.util.*; import static org.gridgain.grid.cache.GridCacheTxConcurrency.*; import static org.gridgain.grid.cache.GridCacheTxState.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; @@ -179,6 +180,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (tx == null) { tx = new GridDhtTxRemote<>( + ctx.shared(), req.nodeId(), req.futureId(), nodeId, @@ -187,11 +189,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.topologyVersion(), req.version(), /*commitVer*/null, + ctx.system(), PESSIMISTIC, req.isolation(), req.isInvalidate(), req.timeout(), - ctx.shared(), req.txSize(), req.groupLockKey(), req.subjectId(), @@ -423,7 +425,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (res != null) { try { // Reply back to sender. - ctx.io().send(nodeId, res); + ctx.io().send(nodeId, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException ignored) { U.warn(log, "Failed to send lock reply to remote node because it left grid: " + nodeId); @@ -748,6 +750,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (req.inTx()) { if (tx == null) { tx = new GridDhtTxLocal<>( + ctx.shared(), nearNode.id(), req.version(), req.futureId(), @@ -755,7 +758,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.threadId(), req.implicitTx(), req.implicitSingleTx(), - ctx.shared(), + ctx.system(), PESSIMISTIC, req.isolation(), req.timeout(), @@ -1059,7 +1062,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach try { // Don't send reply message to this node or if lock was cancelled. if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class)) - ctx.io().send(nearNode, res); + ctx.io().send(nearNode, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (IgniteCheckedException e) { U.error(log, "Failed to send lock reply to originating node (will rollback transaction) [node=" + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index a836ff8..d65de8c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -26,6 +26,7 @@ import java.util.*; import java.util.concurrent.atomic.*; import static org.gridgain.grid.cache.GridCacheTxState.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; /** * @@ -304,6 +305,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.isolation(), commit, tx.isInvalidate(), + tx.system(), tx.isSystemInvalidate(), tx.syncCommit(), tx.syncRollback(), @@ -324,7 +326,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur req.writeVersion(tx.writeVersion()); try { - cctx.io().send(n, req); + cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); if (sync) res = true; @@ -361,6 +363,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.isolation(), commit, tx.isInvalidate(), + tx.system(), tx.isSystemInvalidate(), tx.syncCommit(), tx.syncRollback(), @@ -381,7 +384,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur req.writeVersion(tx.writeVersion()); try { - cctx.io().send(nearMapping.node(), req); + cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); if (sync) res = true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index c5db862..21cccf0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -92,6 +92,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest * @param isolation Transaction isolation. * @param commit Commit flag. * @param invalidate Invalidate flag. + * @param sys System flag. * @param sysInvalidate System invalidation flag. * @param baseVer Base version. * @param committedVers Committed versions. @@ -115,6 +116,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest GridCacheTxIsolation isolation, boolean commit, boolean invalidate, + boolean sys, boolean sysInvalidate, boolean syncCommit, boolean syncRollback, @@ -131,8 +133,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest @Nullable UUID subjId, int taskNameHash ) { - super(xidVer, futId, commitVer, threadId, commit, invalidate, syncCommit, syncRollback, baseVer, committedVers, - rolledbackVers, txSize, writes, recoverWrites, grpLockKey); + super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, + committedVers, rolledbackVers, txSize, writes, recoverWrites, grpLockKey); assert miniId != null; assert nearNodeId != null; @@ -322,25 +324,25 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest } switch (commState.idx) { - case 20: + case 21: if (!commState.putEnum(isolation)) return false; commState.idx++; - case 21: + case 22: if (!commState.putGridUuid(miniId)) return false; commState.idx++; - case 22: + case 23: if (!commState.putUuid(nearNodeId)) return false; commState.idx++; - case 23: + case 24: if (nearWritesBytes != null) { if (commState.it == null) { if (!commState.putInt(nearWritesBytes.size())) @@ -367,13 +369,13 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 24: + case 25: if (!commState.putBoolean(onePhaseCommit)) return false; commState.idx++; - case 25: + case 26: if (pendingVers != null) { if (commState.it == null) { if (!commState.putInt(pendingVers.size())) @@ -400,31 +402,31 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 26: + case 27: if (!commState.putBoolean(sysInvalidate)) return false; commState.idx++; - case 27: + case 28: if (!commState.putLong(topVer)) return false; commState.idx++; - case 28: + case 29: if (!commState.putCacheVersion(writeVer)) return false; commState.idx++; - case 29: + case 30: if (!commState.putUuid(subjId)) return false; commState.idx++; - case 30: + case 31: if (!commState.putInt(taskNameHash)) return false; @@ -444,7 +446,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest return false; switch (commState.idx) { - case 20: + case 21: if (buf.remaining() < 1) return false; @@ -454,7 +456,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 21: + case 22: IgniteUuid miniId0 = commState.getGridUuid(); if (miniId0 == GRID_UUID_NOT_READ) @@ -464,7 +466,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 22: + case 23: UUID nearNodeId0 = commState.getUuid(); if (nearNodeId0 == UUID_NOT_READ) @@ -474,7 +476,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 23: + case 24: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -503,7 +505,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 24: + case 25: if (buf.remaining() < 1) return false; @@ -511,7 +513,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 25: + case 26: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -540,7 +542,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 26: + case 27: if (buf.remaining() < 1) return false; @@ -548,7 +550,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 27: + case 28: if (buf.remaining() < 8) return false; @@ -556,7 +558,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 28: + case 29: GridCacheVersion writeVer0 = commState.getCacheVersion(); if (writeVer0 == CACHE_VER_NOT_READ) @@ -566,7 +568,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 29: + case 30: UUID subjId0 = commState.getUuid(); if (subjId0 == UUID_NOT_READ) @@ -576,7 +578,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest commState.idx++; - case 30: + case 31: if (buf.remaining() < 4) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java index b53c1c8..9fa40d3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -26,6 +26,7 @@ import java.util.*; import java.util.concurrent.atomic.*; import static org.gridgain.grid.cache.GridCacheTxState.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; /** @@ -87,6 +88,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements * @param txNodes Transaction nodes mapping. */ public GridDhtTxLocal( + GridCacheSharedContext<K, V> cctx, UUID nearNodeId, GridCacheVersion nearXidVer, IgniteUuid nearFutId, @@ -94,7 +96,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements long nearThreadId, boolean implicit, boolean implicitSingle, - GridCacheSharedContext<K, V> cctx, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout, @@ -108,10 +110,11 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements int taskNameHash ) { super( + cctx, cctx.versions().onReceivedAndNext(nearNodeId, nearXidVer), implicit, implicitSingle, - cctx, + sys, concurrency, isolation, timeout, @@ -625,7 +628,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements nearFinMiniId, err); try { - cctx.io().send(nearNodeId, res); + cctx.io().send(nearNodeId, res, system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException ignored) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 461ea04..035f9a2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -69,6 +69,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte * @param implicit Implicit flag. * @param implicitSingle Implicit-with-single-key flag. * @param cctx Cache context. + * @param sys System flag. * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. @@ -77,10 +78,11 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte * @param partLock If this is a group-lock transaction and the whole partition should be locked. */ protected GridDhtTxLocalAdapter( + GridCacheSharedContext<K, V> cctx, GridCacheVersion xidVer, boolean implicit, boolean implicitSingle, - GridCacheSharedContext<K, V> cctx, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout, @@ -92,8 +94,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte @Nullable UUID subjId, int taskNameHash ) { - super(cctx, xidVer, implicit, implicitSingle, concurrency, isolation, timeout, invalidate, storeEnabled, txSize, - grpLockKey, partLock, subjId, taskNameHash); + super(cctx, xidVer, implicit, implicitSingle, sys, concurrency, isolation, timeout, invalidate, storeEnabled, + txSize, grpLockKey, partLock, subjId, taskNameHash); assert cctx != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index f872cf9..84c8f0f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.*; import static org.gridgain.grid.cache.GridCacheTxState.*; import static org.apache.ignite.events.IgniteEventType.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; /** * @@ -276,7 +277,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu nearMiniId, tx.xidVersion(), Collections.<Integer>emptySet(), t); try { - cctx.io().send(tx.nearNodeId(), res); + cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (IgniteCheckedException e) { U.error(log, "Failed to send reply to originating near node (will rollback): " + tx.nearNodeId(), e); @@ -386,7 +387,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu res.pending(localDhtPendingVersions(tx.writeEntries(), min)); - cctx.io().send(tx.nearNodeId(), res); + cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } return true; @@ -676,7 +677,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu //noinspection TryWithIdenticalCatches try { - cctx.io().send(n, req); + cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException e) { fut.onResult(e); @@ -730,7 +731,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu //noinspection TryWithIdenticalCatches try { - cctx.io().send(nearMapping.node(), req); + cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException e) { fut.onResult(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index 61dfe70..e6f9051 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -352,37 +352,37 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque } switch (commState.idx) { - case 21: + case 22: if (!commState.putGridUuid(futId)) return false; commState.idx++; - case 22: + case 23: if (!commState.putBitSet(invalidateNearEntries)) return false; commState.idx++; - case 23: + case 24: if (!commState.putBoolean(last)) return false; commState.idx++; - case 24: + case 25: if (!commState.putGridUuid(miniId)) return false; commState.idx++; - case 25: + case 26: if (!commState.putUuid(nearNodeId)) return false; commState.idx++; - case 26: + case 27: if (nearWritesBytes != null) { if (commState.it == null) { if (!commState.putInt(nearWritesBytes.size())) @@ -409,37 +409,37 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 27: + case 28: if (!commState.putCacheVersion(nearXidVer)) return false; commState.idx++; - case 28: + case 29: if (!commState.putByteArray(ownedBytes)) return false; commState.idx++; - case 29: + case 30: if (!commState.putLong(topVer)) return false; commState.idx++; - case 30: + case 31: if (!commState.putUuid(subjId)) return false; commState.idx++; - case 31: + case 32: if (!commState.putInt(taskNameHash)) return false; commState.idx++; - case 32: + case 33: if (!commState.putBitSet(preloadKeys)) return false; @@ -459,7 +459,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque return false; switch (commState.idx) { - case 21: + case 22: IgniteUuid futId0 = commState.getGridUuid(); if (futId0 == GRID_UUID_NOT_READ) @@ -469,7 +469,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 22: + case 23: BitSet invalidateNearEntries0 = commState.getBitSet(); if (invalidateNearEntries0 == BIT_SET_NOT_READ) @@ -479,7 +479,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 23: + case 24: if (buf.remaining() < 1) return false; @@ -487,7 +487,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 24: + case 25: IgniteUuid miniId0 = commState.getGridUuid(); if (miniId0 == GRID_UUID_NOT_READ) @@ -497,7 +497,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 25: + case 26: UUID nearNodeId0 = commState.getUuid(); if (nearNodeId0 == UUID_NOT_READ) @@ -507,7 +507,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 26: + case 27: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -536,7 +536,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 27: + case 28: GridCacheVersion nearXidVer0 = commState.getCacheVersion(); if (nearXidVer0 == CACHE_VER_NOT_READ) @@ -546,7 +546,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 28: + case 29: byte[] ownedBytes0 = commState.getByteArray(); if (ownedBytes0 == BYTE_ARR_NOT_READ) @@ -556,7 +556,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 29: + case 30: if (buf.remaining() < 8) return false; @@ -564,7 +564,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 30: + case 31: UUID subjId0 = commState.getUuid(); if (subjId0 == UUID_NOT_READ) @@ -574,7 +574,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 31: + case 32: if (buf.remaining() < 4) return false; @@ -582,7 +582,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque commState.idx++; - case 32: + case 33: BitSet preloadKeys0 = commState.getBitSet(); if (preloadKeys0 == BIT_SET_NOT_READ) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java index 3bc41b2..e71b0fb 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -59,6 +59,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param topVer Topology version. * @param xidVer XID version. * @param commitVer Commit version. + * @param sys System flag. * @param concurrency Concurrency level (should be pessimistic). * @param isolation Transaction isolation. * @param invalidate Invalidate flag. @@ -70,6 +71,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param txNodes Transaction nodes mapping. */ public GridDhtTxRemote( + GridCacheSharedContext<K, V> ctx, UUID nearNodeId, IgniteUuid rmtFutId, UUID nodeId, @@ -77,11 +79,11 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> long topVer, GridCacheVersion xidVer, GridCacheVersion commitVer, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, boolean invalidate, long timeout, - GridCacheSharedContext<K, V> ctx, int txSize, @Nullable GridCacheTxKey grpLockKey, GridCacheVersion nearXidVer, @@ -89,7 +91,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> @Nullable UUID subjId, int taskNameHash ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize, + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, grpLockKey, subjId, taskNameHash); assert nearNodeId != null; @@ -118,6 +120,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param topVer Topology version. * @param xidVer XID version. * @param commitVer Commit version. + * @param sys System flag. * @param concurrency Concurrency level (should be pessimistic). * @param isolation Transaction isolation. * @param invalidate Invalidate flag. @@ -127,6 +130,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> * @param grpLockKey Group lock key if transaction is group-lock. */ public GridDhtTxRemote( + GridCacheSharedContext<K, V> ctx, UUID nearNodeId, IgniteUuid rmtFutId, UUID nodeId, @@ -135,17 +139,17 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> long topVer, GridCacheVersion xidVer, GridCacheVersion commitVer, + boolean sys, GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, boolean invalidate, long timeout, - GridCacheSharedContext<K, V> ctx, int txSize, @Nullable GridCacheTxKey grpLockKey, @Nullable UUID subjId, int taskNameHash ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize, + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, grpLockKey, subjId, taskNameHash); assert nearNodeId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 535f46e..baa7ead 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -32,6 +32,7 @@ import java.util.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.events.IgniteEventType.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; /** * Colocated cache lock future. @@ -832,7 +833,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req); + cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException ex) { assert fut != null; @@ -847,7 +848,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req); + cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException ex) { assert fut != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java index b67229d..f0ce36b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java @@ -31,6 +31,7 @@ import java.util.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.events.IgniteEventType.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; /** * Cache lock future. @@ -1099,7 +1100,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req); + cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException ex) { assert fut != null; @@ -1114,7 +1115,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B if (log.isDebugEnabled()) log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req); + cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } catch (ClusterTopologyException ex) { assert fut != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java index 9b4d117..7a71452 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -247,17 +247,18 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (tx == null) { tx = new GridNearTxRemote<>( + ctx.shared(), nodeId, req.nearNodeId(), req.nearXidVersion(), req.threadId(), req.version(), null, + ctx.system(), PESSIMISTIC, req.isolation(), req.isInvalidate(), req.timeout(), - ctx.shared(), req.txSize(), req.groupLockKey(), req.subjectId(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 176fdd0..4a438bf 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; @@ -28,6 +27,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.gridgain.grid.cache.GridCacheTxState.*; +import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*; import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; /** @@ -338,6 +338,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu tx.threadId(), commit, tx.isInvalidate(), + tx.system(), tx.syncCommit(), tx.syncRollback(), m.explicitLock(), @@ -373,7 +374,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu cctx.tm().beforeFinishRemote(n.id(), tx.threadId()); try { - cctx.io().send(n, req); + cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); // If we don't wait for result, then mark future as done. if (!isSync() && !m.explicitLock()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java index 90cdfe6..76976e8 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -58,6 +58,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques * @param threadId Thread ID. * @param commit Commit flag. * @param invalidate Invalidate flag. + * @param sys System flag. * @param explicitLock Explicit lock flag. * @param topVer Topology version. * @param baseVer Base version. @@ -73,6 +74,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques long threadId, boolean commit, boolean invalidate, + boolean sys, boolean syncCommit, boolean syncRollback, boolean explicitLock, @@ -85,7 +87,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques Collection<GridCacheTxEntry<K, V>> recoverEntries, @Nullable UUID subjId, int taskNameHash) { - super(xidVer, futId, null, threadId, commit, invalidate, syncCommit, syncRollback, baseVer, committedVers, + super(xidVer, futId, null, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, txSize, writeEntries, recoverEntries, null); this.explicitLock = explicitLock; @@ -175,31 +177,31 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques } switch (commState.idx) { - case 20: + case 21: if (!commState.putBoolean(explicitLock)) return false; commState.idx++; - case 21: + case 22: if (!commState.putGridUuid(miniId)) return false; commState.idx++; - case 22: + case 23: if (!commState.putLong(topVer)) return false; commState.idx++; - case 23: + case 24: if (!commState.putUuid(subjId)) return false; commState.idx++; - case 24: + case 25: if (!commState.putInt(taskNameHash)) return false; @@ -219,7 +221,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques return false; switch (commState.idx) { - case 20: + case 21: if (buf.remaining() < 1) return false; @@ -227,7 +229,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques commState.idx++; - case 21: + case 22: IgniteUuid miniId0 = commState.getGridUuid(); if (miniId0 == GRID_UUID_NOT_READ) @@ -237,7 +239,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques commState.idx++; - case 22: + case 23: if (buf.remaining() < 8) return false; @@ -245,7 +247,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques commState.idx++; - case 23: + case 24: UUID subjId0 = commState.getUuid(); if (subjId0 == UUID_NOT_READ) @@ -255,7 +257,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques commState.idx++; - case 24: + case 25: if (buf.remaining() < 4) return false;