Repository: incubator-ignite Updated Branches: refs/heads/ignite-141 f06aefc27 -> fb13a9a0d
IGNITE-141 - Fixed issue with system transactions interconnection. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fb13a9a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fb13a9a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fb13a9a0 Branch: refs/heads/ignite-141 Commit: fb13a9a0d7b35f6249fa06548bbf871e7294e1d4 Parents: f06aefc Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Mon Mar 2 13:51:30 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Mon Mar 2 13:51:30 2015 -0800 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 59 +----- .../ignite/internal/IgniteTransactionsEx.java | 16 -- .../processors/cache/GridCacheAdapter.java | 16 +- .../cache/GridCacheSharedContext.java | 6 + .../dht/GridDhtTransactionalCacheAdapter.java | 4 +- .../dht/colocated/GridDhtColocatedCache.java | 2 +- .../near/GridNearTransactionalCache.java | 4 +- .../transactions/IgniteTransactionsImpl.java | 36 +--- .../cache/transactions/IgniteTxHandler.java | 8 +- .../cache/transactions/IgniteTxManager.java | 125 ++++++++++-- .../IgniteCacheSystemTransactionsSelfTest.java | 188 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 1 + .../processors/cache/jta/CacheJtaManager.java | 4 +- 13 files changed, 337 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 90d283a..4143457 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2738,15 +2738,16 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (cache == null) cache = ctx.cache().marshallerCache(); - // TODO: IGNITE-141 - Do not create thread. - Thread t = new Thread(new MarshallerCacheUpdater(ctx.log(), cache, id, clsName)); + try { + String old = cache.putIfAbsent(id, clsName); - t.start(); + if (old != null && !old.equals(clsName)) + throw new IgniteException("Type ID collision occured in OptimizedMarshaller. Use " + + "OptimizedMarshallerIdMapper to resolve it [id=" + id + ", clsName1=" + clsName + + "clsName2=" + old + ']'); - try { - t.join(); } - catch (InterruptedException e) { + catch (IgniteCheckedException e) { throw new IgniteException(e); } } @@ -2764,50 +2765,4 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } } - - /** - */ - private static class MarshallerCacheUpdater implements Runnable { - /** */ - private final IgniteLogger log; - - /** */ - private final GridCacheAdapter<Integer, String> cache; - - /** */ - private final int typeId; - - /** */ - private final String clsName; - - /** - * @param cache Cache. - * @param typeId Type ID. - * @param clsName Class name. - */ - private MarshallerCacheUpdater(IgniteLogger log, GridCacheAdapter<Integer, String> cache, int typeId, String clsName) { - this.log = log; - this.cache = cache; - this.typeId = typeId; - this.clsName = clsName; - } - - /** {@inheritDoc} */ - @Override public void run() { - try { - // TODO: IGNITE-141 - Remove debug - U.debug(log, ">>> REGISTER: " + clsName); - - String old = cache.putIfAbsent(typeId, clsName); - - if (old != null && !old.equals(clsName)) - throw new IgniteException("Type ID collision acquired in OptimizedMarshaller. Use " + - "OptimizedMarshallerIdMapper to resolve it [id=" + typeId + ", clsName1=" + clsName + - "clsName2=" + old + ']'); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java index 3ba0bdb..4e60659 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal; import org.apache.ignite.*; -import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.transactions.*; @@ -28,21 +27,6 @@ import org.apache.ignite.transactions.*; */ public interface IgniteTransactionsEx extends IgniteTransactions { /** - * Starts transaction with specified isolation, concurrency, timeout, invalidation flag, - * and number of participating entries. - * - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout Timeout. - * @param txSize Number of entries participating in transaction (may be approximate). - * @return New transaction. - * @throws IllegalStateException If transaction is already started by this thread. - * @throws UnsupportedOperationException If cache is {@link CacheAtomicityMode#ATOMIC}. - */ - public Transaction txStartSystem(TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, - int txSize); - - /** * @param ctx Cache context. * @param concurrency Concurrency. * @param isolation Isolation. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 12ea535..6c15a61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2122,7 +2122,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(ctx.kernalContext(), e); } - tx = ctx.tm().threadLocalTx(); + tx = ctx.tm().threadLocalTx(ctx.system() ? ctx : null); } if (tx == null || tx.implicit()) { @@ -3680,7 +3680,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Nullable @Override public Transaction tx() { - IgniteTxAdapter<K, V> tx = ctx.tm().threadLocalTx(); + IgniteTxAdapter<K, V> tx = ctx.tm().threadLocalTx(null); return tx == null ? null : new TransactionProxyImpl<>(tx, ctx.shared(), false); } @@ -3825,9 +3825,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, TransactionIsolation isolation, long timeout, int txSize) throws IllegalStateException { IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions(); - return ctx.system() ? - txs.txStartSystem(concurrency, isolation, timeout, txSize) : - txs.txStart(concurrency, isolation, timeout, txSize); + return txs.txStartEx(ctx, concurrency, isolation, timeout, txSize).proxy(); } /** {@inheritDoc} */ @@ -4546,7 +4544,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, awaitLastFut(); - IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(); + IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(ctx); if (tx == null || tx.implicit()) { TransactionConfiguration tCfg = ctx.gridConfig().getTransactionConfiguration(); @@ -4554,7 +4552,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, tx = ctx.tm().newTx( true, op.single(), - ctx.system(), + ctx.system() ? ctx : null, OPTIMISTIC, READ_COMMITTED, tCfg.getDefaultTxTimeout(), @@ -4623,13 +4621,13 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (log.isDebugEnabled()) log.debug("Performing async op: " + op); - IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(); + IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(ctx); if (tx == null || tx.implicit()) { tx = ctx.tm().newTx( true, op.single(), - ctx.system(), + ctx.system() ? ctx : null, OPTIMISTIC, READ_COMMITTED, ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 6b17038..fb7c79f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -407,6 +407,12 @@ public class GridCacheSharedContext<K, V> { for (Integer cacheId : activeCacheIds) { GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId); + // System transactions may sap only one cache. + if (cacheCtx.system()) { + if (activeCacheCtx.cacheId() != cacheCtx.cacheId()) + return false; + } + // Check that caches have the same store. if (activeCacheCtx.store().store() != cacheCtx.store().store()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 753f7e9..3fa0b89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -206,7 +206,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.subjectId(), req.taskNameHash()); - tx = ctx.tm().onCreated(tx); + tx = ctx.tm().onCreated(null, tx); if (tx == null || !ctx.tm().onStarted(tx)) throw new IgniteTxRollbackCheckedException("Failed to acquire lock (transaction " + @@ -804,7 +804,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach tx.syncCommit(req.syncCommit()); - tx = ctx.tm().onCreated(tx); + tx = ctx.tm().onCreated(null, tx); if (tx == null || !tx.init()) { String msg = "Failed to acquire lock (transaction has been completed): " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index cdb1759..9467bd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -169,7 +169,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); - IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(); + IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(ctx); if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index d6ec9dd..6255588 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -111,7 +111,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (F.isEmpty(keys)) return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); - IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(); + IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(ctx); if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { @@ -305,7 +305,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (req.groupLock()) tx.groupLockKey(txKey); - tx = ctx.tm().onCreated(tx); + tx = ctx.tm().onCreated(null, tx); if (tx == null || !ctx.tm().onStarted(tx)) throw new IgniteTxRollbackCheckedException("Failed to acquire lock " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 19b8a78..b43b541 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -49,7 +49,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { cfg.getDefaultTxIsolation(), cfg.getDefaultTxTimeout(), 0, - false + null ).proxy(); } @@ -65,7 +65,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { isolation, cfg.getDefaultTxTimeout(), 0, - false + null ).proxy(); } @@ -82,7 +82,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { isolation, timeout, txSize, - false + null ).proxy(); } @@ -103,7 +103,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { isolation, timeout, txSize, - ctx.system()); + ctx); } /** {@inheritDoc} */ @@ -121,24 +121,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { isolation, cfg.getDefaultTxTimeout(), 0, - ctx.system()); - } - - /** {@inheritDoc} */ - @Override public Transaction txStartSystem(TransactionConcurrency concurrency, TransactionIsolation isolation, - long timeout, int txSize) { - A.notNull(concurrency, "concurrency"); - A.notNull(isolation, "isolation"); - A.ensure(timeout >= 0, "timeout cannot be negative"); - A.ensure(txSize >= 0, "transaction size cannot be negative"); - - return txStart0( - concurrency, - isolation, - timeout, - txSize, - true - ).proxy(); + ctx); } /** @@ -146,18 +129,19 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { * @param isolation Transaction isolation. * @param timeout Transaction timeout. * @param txSize Expected transaction size. - * @param sys System flag. + * @param sysCacheCtx System cache context. * @return Transaction. */ + @SuppressWarnings("unchecked") private IgniteInternalTx txStart0(TransactionConcurrency concurrency, TransactionIsolation isolation, - long timeout, int txSize, boolean sys) { + long timeout, int txSize, @Nullable GridCacheContext sysCacheCtx) { TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration(); if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE) throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " + "'txSerializableEnabled' configuration property)"); - IgniteInternalTx<K, V> tx = (IgniteInternalTx<K, V>)cctx.tm().userTx(); + IgniteInternalTx<K, V> tx = (IgniteInternalTx<K, V>)cctx.tm().userTx(sysCacheCtx); if (tx != null) throw new IllegalStateException("Failed to start new transaction " + @@ -166,7 +150,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { tx = cctx.tm().newTx( false, false, - sys, + sysCacheCtx, concurrency, isolation, timeout, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index b29f721..a14902d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -282,7 +282,7 @@ public class IgniteTxHandler<K, V> { req.taskNameHash() ); - tx = ctx.tm().onCreated(tx); + tx = ctx.tm().onCreated(null, tx); if (tx != null) tx.topologyVersion(req.topologyVersion()); @@ -527,7 +527,7 @@ public class IgniteTxHandler<K, V> { if (req.commit()) { if (tx == null) { // Create transaction and add entries. - tx = ctx.tm().onCreated( + tx = ctx.tm().onCreated(null, new GridDhtTxLocal<>( ctx, nodeId, @@ -932,7 +932,7 @@ public class IgniteTxHandler<K, V> { tx.writeVersion(req.writeVersion()); - tx = ctx.tm().onCreated(tx); + tx = ctx.tm().onCreated(null, tx); if (tx == null || !ctx.tm().onStarted(tx)) { if (log.isDebugEnabled()) @@ -1052,7 +1052,7 @@ public class IgniteTxHandler<K, V> { tx.writeVersion(req.writeVersion()); if (!tx.empty()) { - tx = ctx.tm().onCreated(tx); + tx = ctx.tm().onCreated(null, tx); if (tx == null || !ctx.tm().onStarted(tx)) throw new IgniteTxRollbackCheckedException("Attempt to start a completed transaction: " + tx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index af57ce4..bcfe1c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -70,6 +70,9 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { /** Per-thread transaction map. */ private final ConcurrentMap<Long, IgniteInternalTx<K, V>> threadMap = newMap(); + /** Per-thread system transaction map. */ + private final ConcurrentMap<TxThreadKey, IgniteInternalTx<K, V>> sysThreadMap = newMap(); + /** Per-ID map. */ private final ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> idMap = newMap(); @@ -353,7 +356,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { public IgniteTxLocalAdapter<K, V> newTx( boolean implicit, boolean implicitSingle, - boolean sys, + @Nullable GridCacheContext<K, V> sysCacheCtx, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, @@ -362,6 +365,8 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { int txSize, @Nullable IgniteTxKey grpLockKey, boolean partLock) { + assert sysCacheCtx == null || sysCacheCtx.system(); + UUID subjId = null; // TODO GG-9141 how to get subj ID? int taskNameHash = cctx.kernalContext().job().currentTaskNameHash(); @@ -370,7 +375,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { cctx, implicit, implicitSingle, - sys, + sysCacheCtx != null, concurrency, isolation, timeout, @@ -382,14 +387,14 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { subjId, taskNameHash); - return onCreated(tx); + return onCreated(sysCacheCtx, tx); } /** * @param tx Created transaction. * @return Started transaction. */ - @Nullable public <T extends IgniteInternalTx<K, V>> T onCreated(T tx) { + @Nullable public <T extends IgniteInternalTx<K, V>> T onCreated(@Nullable GridCacheContext<K, V> cacheCtx, T tx) { ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> txIdMap = transactionMap(tx); // Start clean. @@ -408,8 +413,12 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { // Add both, explicit and implicit transactions. // Do not add remote and dht local transactions as remote node may have the same thread ID // and overwrite local transaction. - if (tx.local() && !tx.dht()) - threadMap.put(tx.threadId(), tx); + if (tx.local() && !tx.dht()) { + if (cacheCtx == null || !cacheCtx.system()) + threadMap.put(tx.threadId(), tx); + else + sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); + } // Handle mapped versions. if (tx instanceof GridCacheMappedVersion) { @@ -616,8 +625,8 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * @return Transaction for current thread. */ @SuppressWarnings({"unchecked"}) - public <T> T threadLocalTx() { - IgniteInternalTx<K, V> tx = tx(Thread.currentThread().getId()); + public <T> T threadLocalTx(GridCacheContext<K, V> cctx) { + IgniteInternalTx<K, V> tx = tx(cctx, Thread.currentThread().getId()); return tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit() ? (T)tx : null; } @@ -629,7 +638,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { public <T> T tx() { IgniteInternalTx<K, V> tx = txContext(); - return tx != null ? (T)tx : (T)tx(Thread.currentThread().getId()); + return tx != null ? (T)tx : (T)tx(null, Thread.currentThread().getId()); } /** @@ -658,7 +667,16 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { if (tx != null && tx.user() && tx.state() == ACTIVE) return tx; - tx = tx(Thread.currentThread().getId()); + tx = tx(null, Thread.currentThread().getId()); + + return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null; + } + + /** + * @return User transaction for current thread. + */ + @Nullable public IgniteInternalTx userTx(GridCacheContext<K, V> cctx) { + IgniteInternalTx<K, V> tx = tx(cctx, Thread.currentThread().getId()); return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null; } @@ -676,8 +694,13 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { * @return Transaction for thread with given ID. */ @SuppressWarnings({"unchecked"}) - public <T> T tx(long threadId) { - return (T)threadMap.get(threadId); + private <T> T tx(GridCacheContext<K, V> cctx, long threadId) { + if (cctx == null || !cctx.system()) + return (T)threadMap.get(threadId); + + TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId()); + + return (T)sysThreadMap.get(key); } /** @@ -1215,8 +1238,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { committedQ.add(tx); // 11. Remove from per-thread storage. - if (tx.local() && !tx.dht()) - threadMap.remove(tx.threadId(), tx); + clearThreadMap(tx); // 12. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) { @@ -1295,8 +1317,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { decrementStartVersionCount(tx); // 7. Remove from per-thread storage. - if (tx.local() && !tx.dht()) - threadMap.remove(tx.threadId(), tx); + clearThreadMap(tx); // 8. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) @@ -1359,8 +1380,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { decrementStartVersionCount(tx); // 5. Remove from per-thread storage. - if (tx.local() && !tx.dht()) - threadMap.remove(tx.threadId(), tx); + clearThreadMap(tx); // 6. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) @@ -1382,6 +1402,33 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } /** + * @param tx Transaction to clear. + */ + private void clearThreadMap(IgniteInternalTx<K, V> tx) { + if (tx.local() && !tx.dht()) { + if (!tx.system()) + threadMap.remove(tx.threadId(), tx); + else { + Integer cacheId = F.first(tx.activeCacheIds()); + + if (cacheId != null) + sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx); + else { + for (Iterator<IgniteInternalTx<K, V>> it = sysThreadMap.values().iterator(); it.hasNext(); ) { + IgniteInternalTx<K, V> txx = it.next(); + + if (tx == txx) { + it.remove(); + + break; + } + } + } + } + } + } + + /** * Gets transaction ID map depending on transaction type. * * @param tx Transaction. @@ -2015,6 +2062,48 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { } /** + * Per-thread key for system transactions. + */ + private static class TxThreadKey { + /** Thread ID. */ + private long threadId; + + /** Cache ID. */ + private int cacheId; + + /** + * @param threadId Thread ID. + * @param cacheId Cache ID. + */ + private TxThreadKey(long threadId, int cacheId) { + this.threadId = threadId; + this.cacheId = cacheId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof TxThreadKey)) + return false; + + TxThreadKey that = (TxThreadKey)o; + + return cacheId == that.cacheId && threadId == that.threadId; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = (int)(threadId ^ (threadId >>> 32)); + + result = 31 * result + cacheId; + + return result; + } + } + + /** * */ private static class CommittedVersion extends GridCacheVersion { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java new file mode 100644 index 0000000..66ffe61 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.transactions.*; + +import java.util.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * Tests that system transactions do not interact with user transactions. + */ +public class IgniteCacheSystemTransactionsSelfTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setAtomicityMode(TRANSACTIONAL); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + for (String cacheName : new String[] {null, CU.UTILITY_CACHE_NAME, CU.MARSH_CACHE_NAME}) { + IgniteKernal kernal = (IgniteKernal)ignite(0); + + GridCacheAdapter<Object, Object> cache = kernal.context().cache().internalCache(cacheName); + + cache.removeAll(F.asList("1", "2", "3")); + } + } + + /** + * @throws Exception If failed. + */ + public void testSystemTxInsideUserTx() throws Exception { + IgniteKernal ignite = (IgniteKernal)grid(0); + + IgniteCache<Object, Object> jcache = ignite.jcache(null); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + jcache.get("1"); + jcache.put("1", "11"); + + GridCacheAdapter<Object, Object> utilityCache = ignite.context().cache().utilityCache(); + + utilityCache.putIfAbsent("2", "2"); + + try (IgniteInternalTx itx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + assertEquals(null, utilityCache.get("1")); + assertEquals("2", utilityCache.get("2")); + assertEquals(null, utilityCache.get("3")); + + utilityCache.put("3", "3"); + + itx.commit(); + } + + jcache.put("2", "22"); + + tx.commit(); + } + + checkTransactionsCommitted(); + + checkEntries(null, "1", "11", "2", "22", "3", null); + checkEntries(CU.UTILITY_CACHE_NAME, "1", null, "2", "2", "3", "3"); + } + + /** + * @throws Exception If failed. + */ + public void testSystemMarshallerTxInsideSystemTx() throws Exception { + IgniteKernal ignite = (IgniteKernal)grid(0); + + GridCacheAdapter<Object, Object> utilityCache = ignite.context().cache().utilityCache(); + + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + utilityCache.get("1"); + utilityCache.put("1", "11"); + + CacheProjection<String,String> marshallerCache = (GridCacheAdapter<String, String>)(GridCacheAdapter)ignite.context().cache().marshallerCache(); + + marshallerCache.putIfAbsent("2", "2"); + + try (IgniteInternalTx itx = marshallerCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + assertEquals(null, marshallerCache.get("1")); + assertEquals("2", marshallerCache.get("2")); + assertEquals(null, marshallerCache.get("3")); + + marshallerCache.put("3", "3"); + + itx.commit(); + } + + utilityCache.put("2", "22"); + + tx.commit(); + } + + checkTransactionsCommitted(); + + checkEntries(CU.UTILITY_CACHE_NAME, 1, "11", 2, "22", 3, null); + checkEntries(CU.MARSH_CACHE_NAME, 1, null, 2, "2", 3, "3"); + } + + /** + * @throws Exception If failed. + */ + private void checkTransactionsCommitted() throws Exception { + for (int i = 0; i < gridCount(); i++) { + IgniteKernal kernal = (IgniteKernal)grid(i); + + IgniteTxManager<Object, Object> tm = kernal.context().cache().context().tm(); + + Map map = U.field(tm, "threadMap"); + + assertEquals(0, map.size()); + + map = U.field(tm, "sysThreadMap"); + + assertEquals(0, map.size()); + + map = U.field(tm, "idMap"); + + assertEquals(0, map.size()); + } + } + + /** + * @param cacheName Cache to check. + * @param vals Key-value pairs. + * @throws Exception If failed. + */ + private void checkEntries(String cacheName, Object... vals) throws Exception { + for (int g = 0; g < gridCount(); g++) { + IgniteKernal kernal = (IgniteKernal)grid(g); + + GridCacheAdapter<Object, Object> cache = kernal.context().cache().internalCache(cacheName); + + for (int i = 0; i < vals.length; i += 2) { + Object key = vals[i]; + Object val = vals[i + 1]; + + GridCacheEntryEx<Object, Object> entry = cache.peekEx(key); + + if (entry != null) { + assertFalse("Entry is locked [g=" + g + ", cacheName=" + cacheName + ", entry=" + entry + ']', + entry.lockedByAny()); + + assertEquals("Invalid entry value [g=" + g + ", cacheName=" + cacheName + ", entry=" + entry + ']', + val, entry.rawGet()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 6b3eb4a..1fa67c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -142,6 +142,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheReplicatedLocalStoreSelfTest.class); suite.addTestSuite(GridCachePartitionedOffHeapLocalStoreSelfTest.class); suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class); + suite.addTestSuite(IgniteCacheSystemTransactionsSelfTest.class); // Heuristic exception handling. TODO IGNITE-257 // suite.addTestSuite(GridCacheColocatedTxExceptionSelfTest.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java index 6077e4a..b8bf599 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java @@ -80,7 +80,7 @@ public class CacheJtaManager<K, V> extends CacheJtaManagerAdapter<K, V> { tx = cctx.tm().newTx( /*implicit*/false, /*implicit single*/false, - /*system*/false, + null, tCfg.getDefaultTxConcurrency(), tCfg.getDefaultTxIsolation(), tCfg.getDefaultTxTimeout(), @@ -92,7 +92,7 @@ public class CacheJtaManager<K, V> extends CacheJtaManagerAdapter<K, V> { ); } - rsrc = new GridCacheXAResource((IgniteInternalTx)tx, cctx); + rsrc = new GridCacheXAResource(tx, cctx); if (!jtaTx.enlistResource(rsrc)) throw new IgniteCheckedException("Failed to enlist XA resource to JTA user transaction.");