IGNITE-141 - Marshallers refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0fd5967f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0fd5967f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0fd5967f Branch: refs/heads/ignite-141 Commit: 0fd5967f471228edf89a87e7370e7e2512108f12 Parents: 74078f6 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Tue Mar 3 16:15:22 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Tue Mar 3 16:15:22 2015 -0800 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 9 +- .../ignite/internal/GridKernalContextImpl.java | 10 + .../apache/ignite/internal/IgniteKernal.java | 2 + .../org/apache/ignite/internal/IgnitionEx.java | 18 +- .../managers/communication/GridIoManager.java | 13 +- .../managers/communication/GridIoPolicy.java | 5 +- .../processors/cache/GridCacheContext.java | 2 +- .../GridDistributedTxFinishRequest.java | 28 +- .../GridDistributedTxPrepareRequest.java | 12 + .../GridDistributedTxRemoteAdapter.java | 4 + .../dht/GridDhtTransactionalCacheAdapter.java | 2 + .../distributed/dht/GridDhtTxFinishFuture.java | 2 + .../distributed/dht/GridDhtTxFinishRequest.java | 4 +- .../cache/distributed/dht/GridDhtTxLocal.java | 3 + .../distributed/dht/GridDhtTxLocalAdapter.java | 6 +- .../cache/distributed/dht/GridDhtTxRemote.java | 11 +- .../near/GridNearTransactionalCache.java | 1 + .../near/GridNearTxFinishFuture.java | 1 + .../near/GridNearTxFinishRequest.java | 6 +- .../cache/distributed/near/GridNearTxLocal.java | 3 + .../distributed/near/GridNearTxRemote.java | 11 +- .../processors/cache/local/GridLocalTx.java | 206 ----------- .../cache/local/GridLocalTxFuture.java | 351 ------------------- .../cache/transactions/IgniteTxAdapter.java | 12 +- .../cache/transactions/IgniteTxHandler.java | 6 +- .../transactions/IgniteTxLocalAdapter.java | 7 +- .../cache/transactions/IgniteTxManager.java | 2 + .../junits/GridTestKernalContext.java | 1 + 28 files changed, 152 insertions(+), 586 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index f1a135f..bd6d3be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -287,11 +287,18 @@ public interface GridKernalContext extends Iterable<GridComponent> { /** * Gets utility cache pool. * - * @return DR pool. + * @return Utility cache pool. */ public ExecutorService utilityCachePool(); /** + * Gets marshaller cache pool. + * + * @return Marshaller cache pool. + */ + public ExecutorService marshallerCachePool(); + + /** * Gets portable processor. * * @return Portable processor. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 8544c60..b63d65b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -288,6 +288,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable private ExecutorService utilityCachePool; /** */ + private ExecutorService marshCachePool; + + /** */ private IgniteConfiguration cfg; /** */ @@ -338,6 +341,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable GridKernalGateway gw, IgniteExceptionRegistry registry, ExecutorService utilityCachePool, + ExecutorService marshCachePool, ExecutorService execSvc, ExecutorService sysExecSvc, ExecutorService p2pExecSvc, @@ -353,6 +357,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.gw = gw; this.registry = registry; this.utilityCachePool = utilityCachePool; + this.marshCachePool = marshCachePool; this.execSvc = execSvc; this.sysExecSvc = sysExecSvc; this.p2pExecSvc = p2pExecSvc; @@ -695,6 +700,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public ExecutorService marshallerCachePool() { + return marshCachePool; + } + + /** {@inheritDoc} */ @Override public GridPortableProcessor portable() { return portableProc; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index a6a5bde..c682a80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -547,6 +547,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @SuppressWarnings({"CatchGenericClass", "unchecked"}) public void start(final IgniteConfiguration cfg, ExecutorService utilityCachePool, + ExecutorService marshCachePool, final ExecutorService execSvc, final ExecutorService sysExecSvc, ExecutorService p2pExecSvc, @@ -668,6 +669,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { gw, new IgniteExceptionRegistry(log), utilityCachePool, + marshCachePool, execSvc, sysExecSvc, p2pExecSvc, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 7844522..d818381 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1165,6 +1165,9 @@ public class IgnitionEx { /** Utility cache executor service. */ private ExecutorService utilityCacheExecSvc; + /** Marshaller cache executor service. */ + private ExecutorService marshCacheExecSvc; + /** Grid state. */ private volatile IgniteState state = STOPPED; @@ -1385,6 +1388,13 @@ public class IgnitionEx { DFLT_SYSTEM_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); + marshCacheExecSvc = new IgniteThreadPoolExecutor( + "marshaller-cache-" + cfg.getGridName(), + DFLT_SYSTEM_CORE_THREAD_CNT, + DFLT_SYSTEM_MAX_THREAD_CNT, + DFLT_SYSTEM_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); + // Register Ignite MBean for current grid instance. registerFactoryMbean(myCfg.getMBeanServer()); @@ -1396,8 +1406,8 @@ public class IgnitionEx { // Init here to make grid available to lifecycle listeners. grid = grid0; - grid0.start(myCfg, utilityCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, igfsExecSvc, - restExecSvc, + grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, + igfsExecSvc, restExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); @@ -2046,6 +2056,10 @@ public class IgnitionEx { U.shutdownNow(getClass(), utilityCacheExecSvc, log); utilityCacheExecSvc = null; + + U.shutdownNow(getClass(), marshCacheExecSvc, log); + + marshCacheExecSvc = null; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 968e93a..ca84cb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -83,6 +83,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Utility cache pool. */ private ExecutorService utilityCachePool; + /** Marshaller cache pool. */ + private ExecutorService marshCachePool; + /** Discovery listener. */ private GridLocalEventListener discoLsnr; @@ -188,6 +191,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa sysPool = ctx.getSystemExecutorService(); mgmtPool = ctx.getManagementExecutorService(); utilityCachePool = ctx.utilityCachePool(); + marshCachePool = ctx.marshallerCachePool(); affPool = Executors.newFixedThreadPool(1); getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() { @@ -498,7 +502,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case SYSTEM_POOL: case MANAGEMENT_POOL: case AFFINITY_POOL: - case UTILITY_CACHE_POOL: { + case UTILITY_CACHE_POOL: + case MARSH_CACHE_POOL: { if (msg.isOrdered()) processOrderedMessage(nodeId, msg, plc, msgC); else @@ -534,11 +539,17 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return mgmtPool; case AFFINITY_POOL: return affPool; + case UTILITY_CACHE_POOL: assert utilityCachePool != null : "Utility cache pool is not configured."; return utilityCachePool; + case MARSH_CACHE_POOL: + assert marshCachePool != null : "Marshaller cache pool is not configured."; + + return marshCachePool; + default: { assert false : "Invalid communication policy: " + plc; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java index 78ceab4..6e45043 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java @@ -40,7 +40,10 @@ public enum GridIoPolicy { AFFINITY_POOL, /** Utility cache execution pool. */ - UTILITY_CACHE_POOL; + UTILITY_CACHE_POOL, + + /** Marshaller cache execution pool. */ + MARSH_CACHE_POOL; /** Enum values. */ private static final GridIoPolicy[] VALS = values(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index f2e71ab..38b58d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -314,7 +314,7 @@ public class GridCacheContext<K, V> implements Externalizable { sys = ctx.cache().systemCache(cacheName); - plc = sys ? UTILITY_CACHE_POOL : SYSTEM_POOL; + plc = CU.isMarshallerCache(cacheName) ? MARSH_CACHE_POOL : sys ? UTILITY_CACHE_POOL : SYSTEM_POOL; Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index 8f954e8..cb5968d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -73,9 +74,12 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes /** Group lock key bytes. */ private byte[] grpLockKeyBytes; - /** System flag. */ + /** System transaction flag. */ private boolean sys; + /** IO policy. */ + private GridIoPolicy plc; + /** * Empty constructor required by {@link Externalizable}. */ @@ -90,7 +94,8 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes * @param commitVer Commit version. * @param commit Commit flag. * @param invalidate Invalidate flag. - * @param sys System flag. + * @param sys System transaction flag. + * @param plc IO policy. * @param baseVer Base version. * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. @@ -105,6 +110,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes boolean commit, boolean invalidate, boolean sys, + GridIoPolicy plc, boolean syncCommit, boolean syncRollback, GridCacheVersion baseVer, @@ -122,6 +128,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes this.commit = commit; this.invalidate = invalidate; this.sys = sys; + this.plc = plc; this.syncCommit = syncCommit; this.syncRollback = syncRollback; this.baseVer = baseVer; @@ -132,13 +139,20 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes } /** - * @return System flag. + * @return System transaction flag. */ public boolean system() { return sys; } /** + * @return IO policy. + */ + public GridIoPolicy policy() { + return plc; + } + + /** * @return Future ID. */ public IgniteUuid futureId() { @@ -309,7 +323,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes writer.incrementState(); case 16: - if (!writer.writeBoolean("sys", sys)) + if (!writer.writeByte("sys", plc != null ? (byte)plc.ordinal() : -1)) return false; writer.incrementState(); @@ -407,11 +421,15 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes reader.incrementState(); case 16: - sys = reader.readBoolean("sys"); + byte plcOrd; + + plcOrd = reader.readByte("plc"); if (!reader.isLastRead()) return false; + plc = GridIoPolicy.fromOrdinal(plcOrd); + reader.incrementState(); case 17: http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 8f3742b..6dced98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -119,6 +120,9 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe /** System flag. */ private boolean sys; + /** IO policy. */ + private GridIoPolicy plc; + /** * Required by {@link Externalizable}. */ @@ -154,6 +158,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe invalidate = tx.isInvalidate(); txSize = tx.size(); sys = tx.system(); + plc = tx.ioPolicy(); this.reads = reads; this.writes = writes; @@ -178,6 +183,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe } /** + * @return IO policy. + */ + public GridIoPolicy policy() { + return plc; + } + + /** * Adds version to be verified on remote node. * * @param key Key for which version is verified. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index dbf82dd..02c4b97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -88,6 +89,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> * @param xidVer XID version. * @param commitVer Commit version. * @param sys System flag. + * @param plc IO policy. * @param concurrency Concurrency level (should be pessimistic). * @param isolation Transaction isolation. * @param invalidate Invalidate flag. @@ -104,6 +106,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, boolean invalidate, @@ -120,6 +123,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V> ctx.versions().last(), Thread.currentThread().getId(), sys, + plc, concurrency, isolation, timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 3fa0b89..ca99241 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -197,6 +197,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.version(), /*commitVer*/null, ctx.system(), + ctx.ioPolicy(), PESSIMISTIC, req.isolation(), req.isInvalidate(), @@ -790,6 +791,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.implicitTx(), req.implicitSingleTx(), ctx.system(), + ctx.ioPolicy(), PESSIMISTIC, req.isolation(), req.timeout(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 7dac17b..38705df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -318,6 +318,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur commit, tx.isInvalidate(), tx.system(), + tx.ioPolicy(), tx.isSystemInvalidate(), tx.syncCommit(), tx.syncRollback(), @@ -369,6 +370,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur commit, tx.isInvalidate(), tx.system(), + tx.ioPolicy(), tx.isSystemInvalidate(), tx.syncCommit(), tx.syncRollback(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 2835844..4e84426 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; @@ -111,6 +112,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest boolean commit, boolean invalidate, boolean sys, + GridIoPolicy plc, boolean sysInvalidate, boolean syncCommit, boolean syncRollback, @@ -123,7 +125,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest @Nullable UUID subjId, int taskNameHash ) { - super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, + super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, txSize, grpLockKey); assert miniId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 6aa159c..a77b560 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -105,6 +106,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements boolean implicit, boolean implicitSingle, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, @@ -123,6 +125,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements implicit, implicitSingle, sys, + plc, concurrency, isolation, timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 1c71f12..eb5c356 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -95,6 +96,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K boolean implicit, boolean implicitSingle, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, @@ -106,8 +108,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K @Nullable UUID subjId, int taskNameHash ) { - super(cctx, xidVer, implicit, implicitSingle, sys, concurrency, isolation, timeout, invalidate, storeEnabled, - txSize, grpLockKey, partLock, subjId, taskNameHash); + super(cctx, xidVer, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, invalidate, + storeEnabled, txSize, grpLockKey, partLock, subjId, taskNameHash); assert cctx != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 506888b..d818c1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -89,6 +90,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, boolean invalidate, @@ -100,8 +102,8 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> @Nullable UUID subjId, int taskNameHash ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, - grpLockKey, subjId, taskNameHash); + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, + txSize, grpLockKey, subjId, taskNameHash); assert nearNodeId != null; assert rmtFutId != null; @@ -149,6 +151,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, boolean invalidate, @@ -158,8 +161,8 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> @Nullable UUID subjId, int taskNameHash ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, - grpLockKey, subjId, taskNameHash); + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, + txSize, grpLockKey, subjId, taskNameHash); assert nearNodeId != null; assert rmtFutId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 6255588..c25a7a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -292,6 +292,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> req.version(), null, ctx.system(), + ctx.ioPolicy(), PESSIMISTIC, req.isolation(), req.isInvalidate(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index f3811c6..dddeb23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -350,6 +350,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu commit, tx.isInvalidate(), tx.system(), + tx.ioPolicy(), tx.syncCommit(), tx.syncRollback(), m.explicitLock(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index f29cfea..1ac8aed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; @@ -82,6 +83,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques boolean commit, boolean invalidate, boolean sys, + GridIoPolicy plc, boolean syncCommit, boolean syncRollback, boolean explicitLock, @@ -93,8 +95,8 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques int txSize, @Nullable UUID subjId, int taskNameHash) { - super(xidVer, futId, null, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, committedVers, - rolledbackVers, txSize, null); + super(xidVer, futId, null, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer, + committedVers, rolledbackVers, txSize, null); this.explicitLock = explicitLock; this.storeEnabled = storeEnabled; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 06e4767..7a5f9d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; @@ -113,6 +114,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { boolean implicit, boolean implicitSingle, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, @@ -130,6 +132,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> { implicit, implicitSingle, sys, + plc, concurrency, isolation, timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 5f9a0b7..1c69548 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; @@ -92,6 +93,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, boolean invalidate, @@ -102,8 +104,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> @Nullable UUID subjId, int taskNameHash ) throws IgniteCheckedException { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, - grpLockKey, subjId, taskNameHash); + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, + txSize, grpLockKey, subjId, taskNameHash); assert nearNodeId != null; @@ -149,6 +151,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, boolean invalidate, @@ -158,8 +161,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V> @Nullable UUID subjId, int taskNameHash ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize, - grpLockKey, subjId, taskNameHash); + super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, + txSize, grpLockKey, subjId, taskNameHash); assert nearNodeId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java deleted file mode 100644 index 6727f7e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.local; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.transactions.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.transactions.TransactionState.*; - -/** - * Local cache transaction. - */ -class GridLocalTx<K, V> extends IgniteTxLocalAdapter<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Transaction future. */ - private final AtomicReference<GridLocalTxFuture<K, V>> fut = new AtomicReference<>(); - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridLocalTx() { - // No-op. - } - - /** - * @param ctx Cache registry. - * @param implicit {@code True} if transaction is implicitly created by the system, - * {@code false} if user explicitly created the transaction. - * @param implicitSingle Implicit with single kye flag. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout Timeout. - */ - GridLocalTx( - GridCacheSharedContext<K, V> ctx, - boolean implicit, - boolean implicitSingle, - TransactionConcurrency concurrency, - TransactionIsolation isolation, - long timeout, - int txSize, - @Nullable UUID subjId, - int taskNameHash - ) { - super(ctx, ctx.versions().next(), implicit, implicitSingle, false, concurrency, isolation, timeout, false, true, - txSize, null, false, subjId, taskNameHash); - } - - /** {@inheritDoc} */ - @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) { - GridLocalTxFuture<K, V> fut = this.fut.get(); - - return fut != null && fut.onOwnerChanged(entry, owner); - } - - /** {@inheritDoc} */ - @Override public void prepare() throws IgniteCheckedException { - if (!state(PREPARING)) { - TransactionState state = state(); - - // If other thread is doing "prepare", then no-op. - if (state == PREPARING || state == PREPARED || state == COMMITTING || state == COMMITTED) - return; - - setRollbackOnly(); - - throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']'); - } - - try { - userPrepare(); - - state(PREPARED); - } - catch (IgniteCheckedException e) { - setRollbackOnly(); - - throw e; - } - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync() { - try { - prepare(); - - return new GridFinishedFuture<IgniteInternalTx<K, V>>(cctx.kernalContext(), this); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); - } - } - - /** - * Commits without prepare. - * - * @throws IgniteCheckedException If commit failed. - */ - void commit0() throws IgniteCheckedException { - if (state(COMMITTING)) { - try { - userCommit(); - } - finally { - if (!done()) { - if (isRollbackOnly()) { - state(ROLLING_BACK); - - userRollback(); - - state(ROLLED_BACK); - } - else - state(COMMITTED); - } - } - } - } - - /** {@inheritDoc} */ - @SuppressWarnings( {"unchecked", "RedundantCast"}) - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { - try { - prepare(); - } - catch (IgniteCheckedException e) { - state(UNKNOWN); - - return new GridFinishedFuture<>(cctx.kernalContext(), e); - } - - GridLocalTxFuture<K, V> fut = this.fut.get(); - - if (fut == null) { - if (this.fut.compareAndSet(null, fut = new GridLocalTxFuture<>(cctx, this))) { - cctx.mvcc().addFuture(fut); - - fut.checkLocks(); - - return (IgniteInternalFuture)fut; - } - } - - return (IgniteInternalFuture)this.fut.get(); - } - - /** {@inheritDoc} */ - @Override public void rollback() throws IgniteCheckedException { - rollbackAsync().get(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { - try { - state(ROLLING_BACK); - - userRollback(); - - state(ROLLED_BACK); - - return new GridFinishedFuture<IgniteInternalTx>(cctx.kernalContext(), this); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); - } - } - - /** {@inheritDoc} */ - @Override public boolean finish(boolean commit) throws IgniteCheckedException { - assert false; - - return false; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridLocalTx.class, this, "super", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java deleted file mode 100644 index 66a5eb2..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java +++ /dev/null @@ -1,351 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.local; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.transactions.*; -import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.transactions.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.transactions.TransactionState.*; - -/** - * Replicated cache transaction future. - */ -final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx<K, V>> - implements GridCacheMvccFuture<K, V, IgniteInternalTx<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Logger reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Future ID. */ - private IgniteUuid futId = IgniteUuid.randomUuid(); - - /** Cache. */ - @GridToStringExclude - private GridCacheSharedContext<K, V> cctx; - - /** Cache transaction. */ - @GridToStringExclude // Need to exclude due to circular dependencies. - private GridLocalTx<K, V> tx; - - /** Error. */ - private AtomicReference<Throwable> err = new AtomicReference<>(null); - - /** Commit flag. */ - private AtomicBoolean commit = new AtomicBoolean(false); - - /** Logger. */ - @GridToStringExclude - private IgniteLogger log; - - /** Trackable flag. */ - private boolean trackable = true; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridLocalTxFuture() { - // No-op. - } - - /** - * @param cctx Context. - * @param tx Cache transaction. - */ - GridLocalTxFuture( - GridCacheSharedContext<K, V> cctx, - GridLocalTx<K, V> tx) { - super(cctx.kernalContext()); - - assert cctx != null; - assert tx != null; - - this.cctx = cctx; - this.tx = tx; - - log = U.logger(ctx, logRef, GridLocalTxFuture.class); - } - - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - return futId; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return tx.xidVersion(); - } - - /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - // No-op. - return false; - } - - /** {@inheritDoc} */ - @Override public boolean trackable() { - return trackable; - } - - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - trackable = false; - } - - /** - * @return Lock version. - */ - GridLocalTx<K, V> tx() { - return tx; - } - - /** - * - */ - void complete() { - onComplete(); - } - - /** - * @param e Error. - */ - void onError(Throwable e) { - if (err.compareAndSet(null, e)) { - tx.setRollbackOnly(); - - onComplete(); - } - } - - /** - * @param e Error. - */ - @SuppressWarnings({"TypeMayBeWeakened"}) - void onError(IgniteTxOptimisticCheckedException e) { - if (err.compareAndSet(null, e)) { - tx.setRollbackOnly(); - - onComplete(); - } - } - - /** - * @param e Error. - */ - @SuppressWarnings({"TypeMayBeWeakened"}) - void onError(IgniteTxRollbackCheckedException e) { - if (err.compareAndSet(null, e)) { - // Attempt rollback. - if (tx.setRollbackOnly()) { - try { - tx.rollback(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to rollback the transaction: " + tx, ex); - } - } - - onComplete(); - } - } - - /** - * Callback for whenever all replies are received. - */ - @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - void checkLocks() { - for (IgniteTxEntry<K, V> txEntry : tx.writeMap().values()) { - while (true) { - try { - GridCacheEntryEx<K, V> entry = txEntry.cached(); - - if (entry == null) { - onError(new IgniteTxRollbackCheckedException("Failed to find cache entry for " + - "transaction key (will rollback) [key=" + txEntry.key() + ", tx=" + tx + ']')); - - break; - } - - // Another thread or transaction owns some lock. - if (!entry.lockedByThread(tx.threadId())) { - if (tx.pessimistic()) - onError(new IgniteCheckedException("Pessimistic transaction does not own lock for commit: " + tx)); - - if (log.isDebugEnabled()) - log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + entry + - ", tx=" + tx + ']'); - - return; - } - - break; // While. - } - // If entry cached within transaction got removed before lock. - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in checkLocks method (will retry): " + txEntry); - - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - - commit(); - } - - /** - * - * @param entry Entry. - * @param owner Owner. - */ - @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) { - if (log.isDebugEnabled()) - log.debug("Transaction future received owner changed callback [owner=" + owner + ", entry=" + entry + ']'); - - for (IgniteTxEntry<K, V> txEntry : tx.writeMap().values()) { - while (true) { - try { - GridCacheEntryEx<K,V> cached = txEntry.cached(); - - if (entry == null) { - onError(new IgniteTxRollbackCheckedException("Failed to find cache entry for " + - "transaction key (will rollback) [key=" + txEntry.key() + ", tx=" + tx + ']')); - - return true; - } - - // Don't compare entry against itself. - if (cached != entry && !cached.lockedLocally(tx.xidVersion())) { - if (log.isDebugEnabled()) - log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + entry + - ", tx=" + tx + ']'); - - return true; - } - - break; - } - // If entry cached within transaction got removed before lock. - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in onOwnerChanged method (will retry): " + txEntry); - - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - - commit(); - - return false; - } - - /** - * Callback invoked when all locks succeeded. - */ - @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - private void commit() { - if (commit.compareAndSet(false, true)) { - try { - tx.commit0(); - - onComplete(); - } - catch (IgniteTxTimeoutCheckedException e) { - onError(e); - } - catch (IgniteCheckedException e) { - if (tx.state() == UNKNOWN) { - onError(new IgniteTxHeuristicCheckedException("Commit only partially succeeded " + - "(entries will be invalidated on remote nodes once transaction timeout passes): " + - tx, e)); - } - else { - onError(new IgniteTxRollbackCheckedException( - "Failed to commit transaction (will attempt rollback): " + tx, e)); - } - } - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"ThrowableInstanceNeverThrown"}) - @Override public boolean cancel() { - if (log.isDebugEnabled()) - log.debug("Attempting to cancel transaction: " + tx); - - // Attempt rollback. - if (onCancelled()) { - try { - tx.rollback(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to rollback the transaction: " + tx, ex); - } - - if (log.isDebugEnabled()) - log.debug("Transaction was cancelled and rolled back: " + tx); - - return true; - } - - return isCancelled(); - } - - /** - * Completeness callback. - */ - private void onComplete() { - if (onDone(tx, err.get())) - cctx.mvcc().removeFuture(this); - } - - /** - * Checks for errors. - * - * @throws IgniteCheckedException If execution failed. - */ - private void checkError() throws IgniteCheckedException { - if (err.get() != null) - throw U.cast(err.get()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridLocalTxFuture.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index abdb99c..6200593 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -43,7 +43,6 @@ import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import static org.apache.ignite.events.EventType.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; @@ -135,6 +134,9 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter /** System transaction flag. */ private boolean sys; + /** IO policy. */ + private GridIoPolicy plc; + /** */ protected boolean onePhaseCommit; @@ -225,6 +227,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter * @param implicitSingle Implicit with one key flag. * @param loc Local flag. * @param sys System transaction flag. + * @param plc IO policy. * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. @@ -238,6 +241,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter boolean implicitSingle, boolean loc, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, @@ -257,6 +261,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter this.implicitSingle = implicitSingle; this.loc = loc; this.sys = sys; + this.plc = plc; this.concurrency = concurrency; this.isolation = isolation; this.timeout = timeout; @@ -283,6 +288,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter * @param startVer Start version mark. * @param threadId Thread ID. * @param sys System transaction flag. + * @param plc IO policy. * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. @@ -296,6 +302,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter GridCacheVersion startVer, long threadId, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, @@ -310,6 +317,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter this.xidVer = xidVer; this.startVer = startVer; this.sys = sys; + this.plc = plc; this.concurrency = concurrency; this.isolation = isolation; this.timeout = timeout; @@ -412,7 +420,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public GridIoPolicy ioPolicy() { - return sys ? UTILITY_CACHE_POOL : SYSTEM_POOL; + return plc; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index a14902d..430f073 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -269,6 +269,7 @@ public class IgniteTxHandler<K, V> { req.implicitSingle(), req.implicitSingle(), req.system(), + req.policy(), req.concurrency(), req.isolation(), req.timeout(), @@ -506,7 +507,7 @@ public class IgniteTxHandler<K, V> { req.miniId(), new IgniteCheckedException("Transaction has been already completed.")); try { - ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + ctx.io().send(nodeId, res, req.policy()); } catch (Throwable e) { // Double-check. @@ -538,6 +539,7 @@ public class IgniteTxHandler<K, V> { true, false, /* we don't know, so assume false. */ req.system(), + req.policy(), PESSIMISTIC, READ_COMMITTED, /*timeout */0, @@ -919,6 +921,7 @@ public class IgniteTxHandler<K, V> { req.version(), null, req.system(), + req.policy(), req.concurrency(), req.isolation(), req.isInvalidate(), @@ -1038,6 +1041,7 @@ public class IgniteTxHandler<K, V> { req.version(), null, req.system(), + req.policy(), req.concurrency(), req.isolation(), req.isInvalidate(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 8bc5230..ab6721c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; @@ -116,6 +117,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * {@code false} if it was started explicitly by user. * @param implicitSingle {@code True} if transaction is implicit with only one key. * @param sys System flag. + * @param plc IO policy. * @param concurrency Concurrency. * @param isolation Isolation. * @param timeout Timeout. @@ -129,6 +131,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> boolean implicit, boolean implicitSingle, boolean sys, + GridIoPolicy plc, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, @@ -140,8 +143,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> @Nullable UUID subjId, int taskNameHash ) { - super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate, - storeEnabled, txSize, grpLockKey, subjId, taskNameHash); + super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, plc, concurrency, isolation, timeout, + invalidate, storeEnabled, txSize, grpLockKey, subjId, taskNameHash); assert !partLock || grpLockKey != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index bcfe1c2..b92a542 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import org.apache.ignite.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; @@ -376,6 +377,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { implicit, implicitSingle, sysCacheCtx != null, + sysCacheCtx != null ? sysCacheCtx.ioPolicy() : GridIoPolicy.SYSTEM_POOL, concurrency, isolation, timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index afce47b..4911f1b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -46,6 +46,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, null, null, + null, null); GridTestUtils.setFieldValue(grid(), "cfg", config());