Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-141 f06aefc27 -> fb13a9a0d


IGNITE-141 - Fixed issue with system transactions interconnection.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fb13a9a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fb13a9a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fb13a9a0

Branch: refs/heads/ignite-141
Commit: fb13a9a0d7b35f6249fa06548bbf871e7294e1d4
Parents: f06aefc
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Mon Mar 2 13:51:30 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Mon Mar 2 13:51:30 2015 -0800

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  59 +-----
 .../ignite/internal/IgniteTransactionsEx.java   |  16 --
 .../processors/cache/GridCacheAdapter.java      |  16 +-
 .../cache/GridCacheSharedContext.java           |   6 +
 .../dht/GridDhtTransactionalCacheAdapter.java   |   4 +-
 .../dht/colocated/GridDhtColocatedCache.java    |   2 +-
 .../near/GridNearTransactionalCache.java        |   4 +-
 .../transactions/IgniteTransactionsImpl.java    |  36 +---
 .../cache/transactions/IgniteTxHandler.java     |   8 +-
 .../cache/transactions/IgniteTxManager.java     | 125 ++++++++++--
 .../IgniteCacheSystemTransactionsSelfTest.java  | 188 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   1 +
 .../processors/cache/jta/CacheJtaManager.java   |   4 +-
 13 files changed, 337 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 90d283a..4143457 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2738,15 +2738,16 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
             if (cache == null)
                 cache = ctx.cache().marshallerCache();
 
-            // TODO: IGNITE-141 - Do not create thread.
-            Thread t = new Thread(new MarshallerCacheUpdater(ctx.log(), cache, 
id, clsName));
+            try {
+                String old = cache.putIfAbsent(id, clsName);
 
-            t.start();
+                if (old != null && !old.equals(clsName))
+                    throw new IgniteException("Type ID collision occured in 
OptimizedMarshaller. Use " +
+                        "OptimizedMarshallerIdMapper to resolve it [id=" + id 
+ ", clsName1=" + clsName +
+                        "clsName2=" + old + ']');
 
-            try {
-                t.join();
             }
-            catch (InterruptedException e) {
+            catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
             }
         }
@@ -2764,50 +2765,4 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
             }
         }
     }
-
-    /**
-     */
-    private static class MarshallerCacheUpdater implements Runnable {
-        /** */
-        private final IgniteLogger log;
-
-        /** */
-        private final GridCacheAdapter<Integer, String> cache;
-
-        /** */
-        private final int typeId;
-
-        /** */
-        private final String clsName;
-
-        /**
-         * @param cache Cache.
-         * @param typeId Type ID.
-         * @param clsName Class name.
-         */
-        private MarshallerCacheUpdater(IgniteLogger log, 
GridCacheAdapter<Integer, String> cache, int typeId, String clsName) {
-            this.log = log;
-            this.cache = cache;
-            this.typeId = typeId;
-            this.clsName = clsName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() {
-            try {
-                // TODO: IGNITE-141 - Remove debug
-                U.debug(log, ">>> REGISTER: " + clsName);
-
-                String old = cache.putIfAbsent(typeId, clsName);
-
-                if (old != null && !old.equals(clsName))
-                    throw new IgniteException("Type ID collision acquired in 
OptimizedMarshaller. Use " +
-                        "OptimizedMarshallerIdMapper to resolve it [id=" + 
typeId + ", clsName1=" + clsName +
-                        "clsName2=" + old + ']');
-            }
-            catch (IgniteCheckedException e) {
-                throw U.convertException(e);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
index 3ba0bdb..4e60659 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.transactions.*;
@@ -28,21 +27,6 @@ import org.apache.ignite.transactions.*;
  */
 public interface IgniteTransactionsEx extends IgniteTransactions {
     /**
-     * Starts transaction with specified isolation, concurrency, timeout, 
invalidation flag,
-     * and number of participating entries.
-     *
-     * @param concurrency Concurrency.
-     * @param isolation Isolation.
-     * @param timeout Timeout.
-     * @param txSize Number of entries participating in transaction (may be 
approximate).
-     * @return New transaction.
-     * @throws IllegalStateException If transaction is already started by this 
thread.
-     * @throws UnsupportedOperationException If cache is {@link 
CacheAtomicityMode#ATOMIC}.
-     */
-    public Transaction txStartSystem(TransactionConcurrency concurrency, 
TransactionIsolation isolation, long timeout,
-        int txSize);
-
-    /**
      * @param ctx Cache context.
      * @param concurrency Concurrency.
      * @param isolation Isolation.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 12ea535..6c15a61 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2122,7 +2122,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
                 return new GridFinishedFuture<>(ctx.kernalContext(), e);
             }
 
-            tx = ctx.tm().threadLocalTx();
+            tx = ctx.tm().threadLocalTx(ctx.system() ? ctx : null);
         }
 
         if (tx == null || tx.implicit()) {
@@ -3680,7 +3680,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
 
     /** {@inheritDoc} */
     @Nullable @Override public Transaction tx() {
-        IgniteTxAdapter<K, V> tx = ctx.tm().threadLocalTx();
+        IgniteTxAdapter<K, V> tx = ctx.tm().threadLocalTx(null);
 
         return tx == null ? null : new TransactionProxyImpl<>(tx, 
ctx.shared(), false);
     }
@@ -3825,9 +3825,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
         TransactionIsolation isolation, long timeout, int txSize) throws 
IllegalStateException {
         IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions();
 
-        return ctx.system() ?
-            txs.txStartSystem(concurrency, isolation, timeout, txSize) :
-            txs.txStart(concurrency, isolation, timeout, txSize);
+        return txs.txStartEx(ctx, concurrency, isolation, timeout, 
txSize).proxy();
     }
 
     /** {@inheritDoc} */
@@ -4546,7 +4544,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
 
         awaitLastFut();
 
-        IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx();
+        IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(ctx);
 
         if (tx == null || tx.implicit()) {
             TransactionConfiguration tCfg = 
ctx.gridConfig().getTransactionConfiguration();
@@ -4554,7 +4552,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
             tx = ctx.tm().newTx(
                 true,
                 op.single(),
-                ctx.system(),
+                ctx.system() ? ctx : null,
                 OPTIMISTIC,
                 READ_COMMITTED,
                 tCfg.getDefaultTxTimeout(),
@@ -4623,13 +4621,13 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
         if (log.isDebugEnabled())
             log.debug("Performing async op: " + op);
 
-        IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx();
+        IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(ctx);
 
         if (tx == null || tx.implicit()) {
             tx = ctx.tm().newTx(
                 true,
                 op.single(),
-                ctx.system(),
+                ctx.system() ? ctx : null,
                 OPTIMISTIC,
                 READ_COMMITTED,
                 
ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 6b17038..fb7c79f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -407,6 +407,12 @@ public class GridCacheSharedContext<K, V> {
         for (Integer cacheId : activeCacheIds) {
             GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
 
+            // System transactions may sap only one cache.
+            if (cacheCtx.system()) {
+                if (activeCacheCtx.cacheId() != cacheCtx.cacheId())
+                    return false;
+            }
+
             // Check that caches have the same store.
             if (activeCacheCtx.store().store() != cacheCtx.store().store())
                 return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 753f7e9..3fa0b89 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -206,7 +206,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
                                     req.subjectId(),
                                     req.taskNameHash());
 
-                                tx = ctx.tm().onCreated(tx);
+                                tx = ctx.tm().onCreated(null, tx);
 
                                 if (tx == null || !ctx.tm().onStarted(tx))
                                     throw new 
IgniteTxRollbackCheckedException("Failed to acquire lock (transaction " +
@@ -804,7 +804,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, 
V> extends GridDhtCach
 
                                 tx.syncCommit(req.syncCommit());
 
-                                tx = ctx.tm().onCreated(tx);
+                                tx = ctx.tm().onCreated(null, tx);
 
                                 if (tx == null || !tx.init()) {
                                     String msg = "Failed to acquire lock 
(transaction has been completed): " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index cdb1759..9467bd1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -169,7 +169,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(ctx.kernalContext(), 
Collections.<K, V>emptyMap());
 
-        IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx();
+        IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(ctx);
 
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index d6ec9dd..6255588 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -111,7 +111,7 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(ctx.kernalContext(), 
Collections.<K, V>emptyMap());
 
-        IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx();
+        IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(ctx);
 
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
@@ -305,7 +305,7 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
                                     if (req.groupLock())
                                         tx.groupLockKey(txKey);
 
-                                    tx = ctx.tm().onCreated(tx);
+                                    tx = ctx.tm().onCreated(null, tx);
 
                                     if (tx == null || !ctx.tm().onStarted(tx))
                                         throw new 
IgniteTxRollbackCheckedException("Failed to acquire lock " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index 19b8a78..b43b541 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -49,7 +49,7 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
             cfg.getDefaultTxIsolation(),
             cfg.getDefaultTxTimeout(),
             0,
-            false
+            null
         ).proxy();
     }
 
@@ -65,7 +65,7 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
             isolation,
             cfg.getDefaultTxTimeout(),
             0,
-            false
+            null
         ).proxy();
     }
 
@@ -82,7 +82,7 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
             isolation,
             timeout,
             txSize,
-            false
+            null
         ).proxy();
     }
 
@@ -103,7 +103,7 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
             isolation,
             timeout,
             txSize,
-            ctx.system());
+            ctx);
     }
 
     /** {@inheritDoc} */
@@ -121,24 +121,7 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
             isolation,
             cfg.getDefaultTxTimeout(),
             0,
-            ctx.system());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Transaction txStartSystem(TransactionConcurrency 
concurrency, TransactionIsolation isolation,
-        long timeout, int txSize) {
-        A.notNull(concurrency, "concurrency");
-        A.notNull(isolation, "isolation");
-        A.ensure(timeout >= 0, "timeout cannot be negative");
-        A.ensure(txSize >= 0, "transaction size cannot be negative");
-
-        return txStart0(
-            concurrency,
-            isolation,
-            timeout,
-            txSize,
-            true
-        ).proxy();
+            ctx);
     }
 
     /**
@@ -146,18 +129,19 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
      * @param isolation Transaction isolation.
      * @param timeout Transaction timeout.
      * @param txSize Expected transaction size.
-     * @param sys System flag.
+     * @param sysCacheCtx System cache context.
      * @return Transaction.
      */
+    @SuppressWarnings("unchecked")
     private IgniteInternalTx txStart0(TransactionConcurrency concurrency, 
TransactionIsolation isolation,
-        long timeout, int txSize, boolean sys) {
+        long timeout, int txSize, @Nullable GridCacheContext sysCacheCtx) {
         TransactionConfiguration cfg = 
cctx.gridConfig().getTransactionConfiguration();
 
         if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE)
             throw new IllegalArgumentException("SERIALIZABLE isolation level 
is disabled (to enable change " +
                 "'txSerializableEnabled' configuration property)");
 
-        IgniteInternalTx<K, V> tx = (IgniteInternalTx<K, V>)cctx.tm().userTx();
+        IgniteInternalTx<K, V> tx = (IgniteInternalTx<K, 
V>)cctx.tm().userTx(sysCacheCtx);
 
         if (tx != null)
             throw new IllegalStateException("Failed to start new transaction " 
+
@@ -166,7 +150,7 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
         tx = cctx.tm().newTx(
             false,
             false,
-            sys,
+            sysCacheCtx,
             concurrency,
             isolation,
             timeout,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b29f721..a14902d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -282,7 +282,7 @@ public class IgniteTxHandler<K, V> {
                 req.taskNameHash()
             );
 
-            tx = ctx.tm().onCreated(tx);
+            tx = ctx.tm().onCreated(null, tx);
 
             if (tx != null)
                 tx.topologyVersion(req.topologyVersion());
@@ -527,7 +527,7 @@ public class IgniteTxHandler<K, V> {
             if (req.commit()) {
                 if (tx == null) {
                     // Create transaction and add entries.
-                    tx = ctx.tm().onCreated(
+                    tx = ctx.tm().onCreated(null,
                         new GridDhtTxLocal<>(
                             ctx,
                             nodeId,
@@ -932,7 +932,7 @@ public class IgniteTxHandler<K, V> {
 
                 tx.writeVersion(req.writeVersion());
 
-                tx = ctx.tm().onCreated(tx);
+                tx = ctx.tm().onCreated(null, tx);
 
                 if (tx == null || !ctx.tm().onStarted(tx)) {
                     if (log.isDebugEnabled())
@@ -1052,7 +1052,7 @@ public class IgniteTxHandler<K, V> {
                 tx.writeVersion(req.writeVersion());
 
                 if (!tx.empty()) {
-                    tx = ctx.tm().onCreated(tx);
+                    tx = ctx.tm().onCreated(null, tx);
 
                     if (tx == null || !ctx.tm().onStarted(tx))
                         throw new IgniteTxRollbackCheckedException("Attempt to 
start a completed transaction: " + tx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index af57ce4..bcfe1c2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -70,6 +70,9 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
     /** Per-thread transaction map. */
     private final ConcurrentMap<Long, IgniteInternalTx<K, V>> threadMap = 
newMap();
 
+    /** Per-thread system transaction map. */
+    private final ConcurrentMap<TxThreadKey, IgniteInternalTx<K, V>> 
sysThreadMap = newMap();
+
     /** Per-ID map. */
     private final ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> 
idMap = newMap();
 
@@ -353,7 +356,7 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
     public IgniteTxLocalAdapter<K, V> newTx(
         boolean implicit,
         boolean implicitSingle,
-        boolean sys,
+        @Nullable GridCacheContext<K, V> sysCacheCtx,
         TransactionConcurrency concurrency,
         TransactionIsolation isolation,
         long timeout,
@@ -362,6 +365,8 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
         int txSize,
         @Nullable IgniteTxKey grpLockKey,
         boolean partLock) {
+        assert sysCacheCtx == null || sysCacheCtx.system();
+
         UUID subjId = null; // TODO GG-9141 how to get subj ID?
 
         int taskNameHash = cctx.kernalContext().job().currentTaskNameHash();
@@ -370,7 +375,7 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
             cctx,
             implicit,
             implicitSingle,
-            sys,
+            sysCacheCtx != null,
             concurrency,
             isolation,
             timeout,
@@ -382,14 +387,14 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
             subjId,
             taskNameHash);
 
-        return onCreated(tx);
+        return onCreated(sysCacheCtx, tx);
     }
 
     /**
      * @param tx Created transaction.
      * @return Started transaction.
      */
-    @Nullable public <T extends IgniteInternalTx<K, V>> T onCreated(T tx) {
+    @Nullable public <T extends IgniteInternalTx<K, V>> T onCreated(@Nullable 
GridCacheContext<K, V> cacheCtx, T tx) {
         ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> txIdMap = 
transactionMap(tx);
 
         // Start clean.
@@ -408,8 +413,12 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
             // Add both, explicit and implicit transactions.
             // Do not add remote and dht local transactions as remote node may 
have the same thread ID
             // and overwrite local transaction.
-            if (tx.local() && !tx.dht())
-                threadMap.put(tx.threadId(), tx);
+            if (tx.local() && !tx.dht()) {
+                if (cacheCtx == null || !cacheCtx.system())
+                    threadMap.put(tx.threadId(), tx);
+                else
+                    sysThreadMap.put(new TxThreadKey(tx.threadId(), 
cacheCtx.cacheId()), tx);
+            }
 
             // Handle mapped versions.
             if (tx instanceof GridCacheMappedVersion) {
@@ -616,8 +625,8 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
      * @return Transaction for current thread.
      */
     @SuppressWarnings({"unchecked"})
-    public <T> T threadLocalTx() {
-        IgniteInternalTx<K, V> tx = tx(Thread.currentThread().getId());
+    public <T> T threadLocalTx(GridCacheContext<K, V> cctx) {
+        IgniteInternalTx<K, V> tx = tx(cctx, Thread.currentThread().getId());
 
         return tx != null && tx.local() && (!tx.dht() || tx.colocated()) && 
!tx.implicit() ? (T)tx : null;
     }
@@ -629,7 +638,7 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
     public <T> T tx() {
         IgniteInternalTx<K, V> tx = txContext();
 
-        return tx != null ? (T)tx : (T)tx(Thread.currentThread().getId());
+        return tx != null ? (T)tx : (T)tx(null, 
Thread.currentThread().getId());
     }
 
     /**
@@ -658,7 +667,16 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
         if (tx != null && tx.user() && tx.state() == ACTIVE)
             return tx;
 
-        tx = tx(Thread.currentThread().getId());
+        tx = tx(null, Thread.currentThread().getId());
+
+        return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null;
+    }
+
+    /**
+     * @return User transaction for current thread.
+     */
+    @Nullable public IgniteInternalTx userTx(GridCacheContext<K, V> cctx) {
+        IgniteInternalTx<K, V> tx = tx(cctx, Thread.currentThread().getId());
 
         return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null;
     }
@@ -676,8 +694,13 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
      * @return Transaction for thread with given ID.
      */
     @SuppressWarnings({"unchecked"})
-    public <T> T tx(long threadId) {
-        return (T)threadMap.get(threadId);
+    private <T> T tx(GridCacheContext<K, V> cctx, long threadId) {
+        if (cctx == null || !cctx.system())
+            return (T)threadMap.get(threadId);
+
+        TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId());
+
+        return (T)sysThreadMap.get(key);
     }
 
     /**
@@ -1215,8 +1238,7 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
                 committedQ.add(tx);
 
             // 11. Remove from per-thread storage.
-            if (tx.local() && !tx.dht())
-                threadMap.remove(tx.threadId(), tx);
+            clearThreadMap(tx);
 
             // 12. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty()) {
@@ -1295,8 +1317,7 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
                 decrementStartVersionCount(tx);
 
             // 7. Remove from per-thread storage.
-            if (tx.local() && !tx.dht())
-                threadMap.remove(tx.threadId(), tx);
+            clearThreadMap(tx);
 
             // 8. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty())
@@ -1359,8 +1380,7 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
                 decrementStartVersionCount(tx);
 
             // 5. Remove from per-thread storage.
-            if (tx.local() && !tx.dht())
-                threadMap.remove(tx.threadId(), tx);
+            clearThreadMap(tx);
 
             // 6. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty())
@@ -1382,6 +1402,33 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
     }
 
     /**
+     * @param tx Transaction to clear.
+     */
+    private void clearThreadMap(IgniteInternalTx<K, V> tx) {
+        if (tx.local() && !tx.dht()) {
+            if (!tx.system())
+                threadMap.remove(tx.threadId(), tx);
+            else {
+                Integer cacheId = F.first(tx.activeCacheIds());
+
+                if (cacheId != null)
+                    sysThreadMap.remove(new TxThreadKey(tx.threadId(), 
cacheId), tx);
+                else {
+                    for (Iterator<IgniteInternalTx<K, V>> it = 
sysThreadMap.values().iterator(); it.hasNext(); ) {
+                        IgniteInternalTx<K, V> txx = it.next();
+
+                        if (tx == txx) {
+                            it.remove();
+
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
      * Gets transaction ID map depending on transaction type.
      *
      * @param tx Transaction.
@@ -2015,6 +2062,48 @@ public class IgniteTxManager<K, V> extends 
GridCacheSharedManagerAdapter<K, V> {
     }
 
     /**
+     * Per-thread key for system transactions.
+     */
+    private static class TxThreadKey {
+        /** Thread ID. */
+        private long threadId;
+
+        /** Cache ID. */
+        private int cacheId;
+
+        /**
+         * @param threadId Thread ID.
+         * @param cacheId Cache ID.
+         */
+        private TxThreadKey(long threadId, int cacheId) {
+            this.threadId = threadId;
+            this.cacheId = cacheId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof TxThreadKey))
+                return false;
+
+            TxThreadKey that = (TxThreadKey)o;
+
+            return cacheId == that.cacheId && threadId == that.threadId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = (int)(threadId ^ (threadId >>> 32));
+
+            result = 31 * result + cacheId;
+
+            return result;
+        }
+    }
+
+    /**
      *
      */
     private static class CommittedVersion extends GridCacheVersion {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
new file mode 100644
index 0000000..66ffe61
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSystemTransactionsSelfTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Tests that system transactions do not interact with user transactions.
+ */
+public class IgniteCacheSystemTransactionsSelfTest extends 
GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) 
throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        for (String cacheName : new String[] {null, CU.UTILITY_CACHE_NAME, 
CU.MARSH_CACHE_NAME}) {
+            IgniteKernal kernal = (IgniteKernal)ignite(0);
+
+            GridCacheAdapter<Object, Object> cache = 
kernal.context().cache().internalCache(cacheName);
+
+            cache.removeAll(F.asList("1", "2", "3"));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSystemTxInsideUserTx() throws Exception {
+        IgniteKernal ignite = (IgniteKernal)grid(0);
+
+        IgniteCache<Object, Object> jcache = ignite.jcache(null);
+
+        try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+            jcache.get("1");
+            jcache.put("1", "11");
+
+            GridCacheAdapter<Object, Object> utilityCache = 
ignite.context().cache().utilityCache();
+
+            utilityCache.putIfAbsent("2", "2");
+
+            try (IgniteInternalTx itx = utilityCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
+                assertEquals(null, utilityCache.get("1"));
+                assertEquals("2", utilityCache.get("2"));
+                assertEquals(null, utilityCache.get("3"));
+
+                utilityCache.put("3", "3");
+
+                itx.commit();
+            }
+
+            jcache.put("2", "22");
+
+            tx.commit();
+        }
+
+        checkTransactionsCommitted();
+
+        checkEntries(null,                  "1", "11", "2", "22", "3", null);
+        checkEntries(CU.UTILITY_CACHE_NAME, "1", null, "2", "2",  "3", "3");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSystemMarshallerTxInsideSystemTx() throws Exception {
+        IgniteKernal ignite = (IgniteKernal)grid(0);
+
+        GridCacheAdapter<Object, Object> utilityCache = 
ignite.context().cache().utilityCache();
+
+        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
+            utilityCache.get("1");
+            utilityCache.put("1", "11");
+
+            CacheProjection<String,String> marshallerCache = 
(GridCacheAdapter<String, 
String>)(GridCacheAdapter)ignite.context().cache().marshallerCache();
+
+            marshallerCache.putIfAbsent("2", "2");
+
+            try (IgniteInternalTx itx = marshallerCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
+                assertEquals(null, marshallerCache.get("1"));
+                assertEquals("2", marshallerCache.get("2"));
+                assertEquals(null, marshallerCache.get("3"));
+
+                marshallerCache.put("3", "3");
+
+                itx.commit();
+            }
+
+            utilityCache.put("2", "22");
+
+            tx.commit();
+        }
+
+        checkTransactionsCommitted();
+
+        checkEntries(CU.UTILITY_CACHE_NAME, 1, "11", 2, "22", 3, null);
+        checkEntries(CU.MARSH_CACHE_NAME,   1, null, 2, "2",  3, "3");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkTransactionsCommitted() throws Exception {
+        for (int i = 0; i < gridCount(); i++) {
+            IgniteKernal kernal = (IgniteKernal)grid(i);
+
+            IgniteTxManager<Object, Object> tm = 
kernal.context().cache().context().tm();
+
+            Map map = U.field(tm, "threadMap");
+
+            assertEquals(0, map.size());
+
+            map = U.field(tm, "sysThreadMap");
+
+            assertEquals(0, map.size());
+
+            map = U.field(tm, "idMap");
+
+            assertEquals(0, map.size());
+        }
+    }
+
+    /**
+     * @param cacheName Cache to check.
+     * @param vals Key-value pairs.
+     * @throws Exception If failed.
+     */
+    private void checkEntries(String cacheName, Object... vals) throws 
Exception {
+        for (int g = 0; g < gridCount(); g++) {
+            IgniteKernal kernal = (IgniteKernal)grid(g);
+
+            GridCacheAdapter<Object, Object> cache = 
kernal.context().cache().internalCache(cacheName);
+
+            for (int i = 0; i < vals.length; i += 2) {
+                Object key = vals[i];
+                Object val = vals[i + 1];
+
+                GridCacheEntryEx<Object, Object> entry = cache.peekEx(key);
+
+                if (entry != null) {
+                    assertFalse("Entry is locked [g=" + g + ", cacheName=" + 
cacheName + ", entry=" + entry + ']',
+                        entry.lockedByAny());
+
+                    assertEquals("Invalid entry value [g=" + g + ", 
cacheName=" + cacheName + ", entry=" + entry + ']',
+                        val, entry.rawGet());
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 6b3eb4a..1fa67c5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -142,6 +142,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheReplicatedLocalStoreSelfTest.class);
         
suite.addTestSuite(GridCachePartitionedOffHeapLocalStoreSelfTest.class);
         suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class);
+        suite.addTestSuite(IgniteCacheSystemTransactionsSelfTest.class);
 
         // Heuristic exception handling. TODO IGNITE-257
 //        suite.addTestSuite(GridCacheColocatedTxExceptionSelfTest.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb13a9a0/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
----------------------------------------------------------------------
diff --git 
a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
 
b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
index 6077e4a..b8bf599 100644
--- 
a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
+++ 
b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
@@ -80,7 +80,7 @@ public class CacheJtaManager<K, V> extends 
CacheJtaManagerAdapter<K, V> {
                             tx = cctx.tm().newTx(
                                 /*implicit*/false,
                                 /*implicit single*/false,
-                                /*system*/false,
+                                null,
                                 tCfg.getDefaultTxConcurrency(),
                                 tCfg.getDefaultTxIsolation(),
                                 tCfg.getDefaultTxTimeout(),
@@ -92,7 +92,7 @@ public class CacheJtaManager<K, V> extends 
CacheJtaManagerAdapter<K, V> {
                             );
                         }
 
-                        rsrc = new GridCacheXAResource((IgniteInternalTx)tx, 
cctx);
+                        rsrc = new GridCacheXAResource(tx, cctx);
 
                         if (!jtaTx.enlistResource(rsrc))
                             throw new IgniteCheckedException("Failed to enlist 
XA resource to JTA user transaction.");

Reply via email to