# ignite-80

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/821c016b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/821c016b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/821c016b

Branch: refs/heads/ignite-80
Commit: 821c016b11919ad98d0c7caaec938bc4506a32f6
Parents: ae43a30
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Apr 7 13:22:14 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Apr 7 17:39:47 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |  18 +-
 .../datastructures/DataStructuresProcessor.java | 450 +++++++------------
 .../IgniteCacheDataStructuresSelfTestSuite.java |   5 +-
 3 files changed, 178 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/821c016b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index fe81006..f7e19d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1679,8 +1679,6 @@ public class IgnitionEx {
 
             final boolean hasHadoop = IgniteComponentType.HADOOP.inClassPath();
 
-            final boolean hasAtomics = cfg.getAtomicConfiguration() != null;
-
             final boolean clientDisco = cfg.getDiscoverySpi() instanceof 
TcpClientDiscoverySpi;
 
             CacheConfiguration[] copies;
@@ -1709,14 +1707,11 @@ public class IgnitionEx {
                             "\" because it is reserved for internal 
purposes.");
                 }
 
-                int addCacheCnt = 2; // Always add marshaller and utility 
caches.
+                int addCacheCnt = 3; // Always add marshaller, utility, 
atomics caches.
 
                 if (hasHadoop)
                     addCacheCnt++;
 
-                if (hasAtomics)
-                    addCacheCnt++;
-
                 copies = new CacheConfiguration[cacheCfgs.length + 
addCacheCnt];
 
                 int cloneIdx = 2;
@@ -1724,21 +1719,17 @@ public class IgnitionEx {
                 if (hasHadoop)
                     copies[cloneIdx++] = CU.hadoopSystemCache();
 
-                if (hasAtomics)
-                    copies[cloneIdx++] = 
atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco);
+                copies[cloneIdx++] = 
atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco);
 
                 for (CacheConfiguration ccfg : cacheCfgs)
                     copies[cloneIdx++] = new CacheConfiguration(ccfg);
             }
             else {
-                int cacheCnt = 2; // Always add marshaller and utility caches.
+                int cacheCnt = 3; // Always add marshaller, utility, atomics 
caches.
 
                 if (hasHadoop)
                     cacheCnt++;
 
-                if (hasAtomics)
-                    cacheCnt++;
-
                 copies = new CacheConfiguration[cacheCnt];
 
                 int cacheIdx = 2;
@@ -1746,8 +1737,7 @@ public class IgnitionEx {
                 if (hasHadoop)
                     copies[cacheIdx++] = CU.hadoopSystemCache();
 
-                if (hasAtomics)
-                    copies[cacheIdx] = 
atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco);
+                copies[cacheIdx] = 
atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco);
             }
 
             // Always add marshaller and utility caches.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/821c016b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 1ef5c55..3b20372 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -88,7 +88,7 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
     private final AtomicConfiguration atomicCfg;
 
     /** */
-    private GridCacheProjectionEx<CacheDataStructuresConfigurationKey, 
Map<String, DataStructureInfo>> utilityCache;
+    private GridCacheProjectionEx<CacheDataStructuresConfigurationKey, 
Map<String, DataStructureInfo>> dsInfoView;
 
     /**
      * @param ctx Context.
@@ -107,29 +107,25 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
-        utilityCache = (GridCacheProjectionEx)ctx.cache().utilityCache();
+        GridCache atomicsCache = ctx.cache().atomicsCache();
 
-        assert utilityCache != null;
+        assert atomicsCache != null;
 
-        if (atomicCfg != null) {
-            GridCache atomicsCache = ctx.cache().atomicsCache();
+        dsInfoView = (GridCacheProjectionEx)atomicsCache;
 
-            assert atomicsCache != null;
+        dsView = atomicsCache;
 
-            dsView = atomicsCache;
+        cntDownLatchView = atomicsCache;
 
-            cntDownLatchView = atomicsCache;
+        atomicLongView = atomicsCache;
 
-            atomicLongView = atomicsCache;
+        atomicRefView = atomicsCache;
 
-            atomicRefView = atomicsCache;
+        atomicStampedView = atomicsCache;
 
-            atomicStampedView = atomicsCache;
+        seqView = atomicsCache;
 
-            seqView = atomicsCache;
-
-            dsCacheCtx = 
ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context();
-        }
+        dsCacheCtx = 
ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context();
     }
 
     /**
@@ -150,13 +146,11 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        return getAtomic(new IgniteOutClosureX<IgniteAtomicSequence>() {
-            @Override public IgniteAtomicSequence applyx() throws 
IgniteCheckedException {
+        return getAtomic(new IgniteClosureX<IgniteInternalTx, 
IgniteAtomicSequence>() {
+            @Override public IgniteAtomicSequence applyx(IgniteInternalTx tx) 
throws IgniteCheckedException {
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
 
-                dsCacheCtx.gate().enter();
-
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try {
                     GridCacheAtomicSequenceValue seqVal = 
cast(dsView.get(key), GridCacheAtomicSequenceValue.class);
 
                     // Check that sequence hasn't been created in other thread 
yet.
@@ -209,8 +203,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     dsMap.put(key, seq);
 
-                    tx.commit();
-
                     return seq;
                 }
                 catch (Error | Exception e) {
@@ -220,9 +212,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     throw e;
                 }
-                finally {
-                    dsCacheCtx.gate().leave();
-                }
             }
         }, new DataStructureInfo(name, ATOMIC_SEQ, null), create, 
IgniteAtomicSequence.class);
     }
@@ -238,21 +227,11 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        removeDataStructure(new IgniteCallable<Void>() {
-            @Override public Void call() throws Exception {
-                dsCacheCtx.gate().enter();
-
-                try {
-                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+        removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() {
+            @Override public Void applyx(IgniteInternalTx tx) throws 
IgniteCheckedException {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
-                    removeInternal(key, GridCacheAtomicSequenceValue.class);
-                }
-                catch (Exception e) {
-                    throw new IgniteCheckedException("Failed to remove 
sequence by name: " + name, e);
-                }
-                finally {
-                    dsCacheCtx.gate().leave();
-                }
+                removeInternal(tx, key, GridCacheAtomicSequenceValue.class);
 
                 return null;
             }
@@ -276,13 +255,11 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        return getAtomic(new IgniteOutClosureX<IgniteAtomicLong>() {
-            @Override public IgniteAtomicLong applyx() throws 
IgniteCheckedException {
+        return getAtomic(new IgniteClosureX<IgniteInternalTx, 
IgniteAtomicLong>() {
+            @Override public IgniteAtomicLong applyx(IgniteInternalTx tx) 
throws IgniteCheckedException {
                 final GridCacheInternalKey key = new 
GridCacheInternalKeyImpl(name);
 
-                dsCacheCtx.gate().enter();
-
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try {
                     GridCacheAtomicLongValue val = cast(dsView.get(key), 
GridCacheAtomicLongValue.class);
 
                     // Check that atomic long hasn't been created in other 
thread yet.
@@ -307,8 +284,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     dsMap.put(key, a);
 
-                    tx.commit();
-
                     return a;
                 }
                 catch (Error | Exception e) {
@@ -318,9 +293,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     throw e;
                 }
-                finally {
-                    dsCacheCtx.gate().leave();
-                }
             }
         }, new DataStructureInfo(name, ATOMIC_LONG, null), create, 
IgniteAtomicLong.class);
     }
@@ -333,49 +305,51 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
      * @return Data structure instance.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private <T> T getAtomic(final IgniteOutClosureX<T> c,
+    @Nullable private <T> T getAtomic(final IgniteClosureX<IgniteInternalTx, 
T> c,
         DataStructureInfo dsInfo,
         boolean create,
         Class<? extends T> cls)
         throws IgniteCheckedException
     {
-        Map<String, DataStructureInfo> dsMap = 
utilityCache.get(DATA_STRUCTURES_KEY);
+        dsCacheCtx.gate().enter();
 
-        if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
-            return null;
+        try {
+            Map<String, DataStructureInfo> dsMap = 
dsInfoView.get(DATA_STRUCTURES_KEY);
 
-        IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, 
create);
+            if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
+                return null;
 
-        if (err != null)
-            throw err;
+            IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, 
create);
 
-        final GridCacheInternalKey key = new 
GridCacheInternalKeyImpl(dsInfo.name);
+            if (err != null)
+                throw err;
 
-        // Check type of structure received by key from local cache.
-        T dataStructure = cast(this.dsMap.get(key), cls);
+            final GridCacheInternalKey key = new 
GridCacheInternalKeyImpl(dsInfo.name);
 
-        if (dataStructure != null)
-            return dataStructure;
+            // Check type of structure received by key from local cache.
+            T dataStructure = cast(this.dsMap.get(key), cls);
 
-        if (!create)
-            return c.applyx();
+            if (dataStructure != null)
+                return dataStructure;
 
-        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
-            err = utilityCache.invoke(DATA_STRUCTURES_KEY, new 
AddAtomicProcessor(dsInfo)).get();
+            try (IgniteInternalTx tx = dsInfoView.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
+                if (create) {
+                    err = dsInfoView.invoke(DATA_STRUCTURES_KEY, new 
AddAtomicProcessor(dsInfo)).get();
 
-            if (err != null)
-                throw err;
-
-            dataStructure = ctx.closure().callLocalSafe(new Callable<T>() {
-                @Override public T call() throws Exception {
-                    return c.applyx();
+                    if (err != null)
+                        throw err;
                 }
-            }, false).get();
 
-            tx.commit();
-        }
+                dataStructure = c.applyx(tx);
 
-        return dataStructure;
+                tx.commit();
+            }
+
+            return dataStructure;
+        }
+        finally {
+            dsCacheCtx.gate().leave();
+        }
     }
 
     /**
@@ -388,19 +362,9 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         assert name != null;
         assert dsCacheCtx != null;
 
-        removeDataStructure(new IgniteCallable<Void>() {
-            @Override public Void call() throws Exception {
-                dsCacheCtx.gate().enter();
-
-                try {
-                    removeInternal(new GridCacheInternalKeyImpl(name), 
GridCacheAtomicLongValue.class);
-                }
-                catch (Exception e) {
-                    throw new IgniteCheckedException("Failed to remove atomic 
long by name: " + name, e);
-                }
-                finally {
-                    dsCacheCtx.gate().leave();
-                }
+        removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() {
+            @Override public Void applyx(IgniteInternalTx tx) throws 
IgniteCheckedException {
+                removeInternal(tx, new GridCacheInternalKeyImpl(name), 
GridCacheAtomicLongValue.class);
 
                 return null;
             }
@@ -414,45 +378,52 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
      * @param afterRmv Optional closure to run after data structure removed.
      * @throws IgniteCheckedException If failed.
      */
-    private <T> void removeDataStructure(IgniteCallable<T> c,
+    private <T> void removeDataStructure(IgniteClosureX<IgniteInternalTx, T> c,
         String name,
         DataStructureType type,
         @Nullable IgniteInClosureX<T> afterRmv)
         throws IgniteCheckedException
     {
-        Map<String, DataStructureInfo> dsMap = 
utilityCache.get(DATA_STRUCTURES_KEY);
-
-        if (dsMap == null || !dsMap.containsKey(name))
-            return;
-
-        DataStructureInfo dsInfo = new DataStructureInfo(name, type, null);
+        T rmvInfo;
 
-        IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, 
false);
+        dsCacheCtx.gate().enter();
 
-        if (err != null)
-            throw err;
+        try {
+            Map<String, DataStructureInfo> dsMap = 
dsInfoView.get(DATA_STRUCTURES_KEY);
 
-        T rmvInfo;
+            if (dsMap == null || !dsMap.containsKey(name))
+                return;
 
-        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
-            T2<Boolean, IgniteCheckedException> res =
-                utilityCache.invoke(DATA_STRUCTURES_KEY, new 
RemoveDataStructureProcessor(dsInfo)).get();
+            DataStructureInfo dsInfo = new DataStructureInfo(name, type, null);
 
-            err = res.get2();
+            IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, 
false);
 
             if (err != null)
                 throw err;
 
-            assert res.get1() != null;
+            try (IgniteInternalTx tx = dsInfoView.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
+                T2<Boolean, IgniteCheckedException> res =
+                    dsInfoView.invoke(DATA_STRUCTURES_KEY, new 
RemoveDataStructureProcessor(dsInfo)).get();
 
-            boolean exists = res.get1();
+                err = res.get2();
 
-            if (!exists)
-                return;
+                if (err != null)
+                    throw err;
+
+                assert res.get1() != null;
 
-            rmvInfo = ctx.closure().callLocalSafe(c, false).get();
+                boolean exists = res.get1();
 
-            tx.commit();
+                if (!exists)
+                    return;
+
+                rmvInfo = c.applyx(tx);
+
+                tx.commit();
+            }
+        }
+        finally {
+            dsCacheCtx.gate().leave();
         }
 
         if (afterRmv != null && rmvInfo != null)
@@ -479,13 +450,11 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        return getAtomic(new IgniteOutClosureX<IgniteAtomicReference>() {
-            @Override public IgniteAtomicReference<T> applyx() throws 
IgniteCheckedException {
+        return getAtomic(new IgniteClosureX<IgniteInternalTx, 
IgniteAtomicReference>() {
+            @Override public IgniteAtomicReference<T> applyx(IgniteInternalTx 
tx) throws IgniteCheckedException {
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
 
-                dsCacheCtx.gate().enter();
-
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try {
                     GridCacheAtomicReferenceValue val = cast(dsView.get(key),
                         GridCacheAtomicReferenceValue.class);
 
@@ -512,8 +481,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     dsMap.put(key, ref);
 
-                    tx.commit();
-
                     return ref;
                 }
                 catch (Error | Exception e) {
@@ -523,9 +490,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     throw e;
                 }
-                finally {
-                    dsCacheCtx.gate().leave();
-                }
             }
         }, new DataStructureInfo(name, ATOMIC_REF, null), create, 
IgniteAtomicReference.class);
     }
@@ -540,21 +504,11 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         assert name != null;
         assert dsCacheCtx != null;
 
-        removeDataStructure(new IgniteCallable<Void>() {
-            @Override public Void call() throws Exception {
-                dsCacheCtx.gate().enter();
-
-                try {
-                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+        removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() {
+            @Override public Void applyx(IgniteInternalTx tx) throws 
IgniteCheckedException {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
-                    removeInternal(key, GridCacheAtomicReferenceValue.class);
-                }
-                catch (Exception e) {
-                    throw new IgniteCheckedException("Failed to remove atomic 
reference by name: " + name, e);
-                }
-                finally {
-                    dsCacheCtx.gate().leave();
-                }
+                removeInternal(tx, key, GridCacheAtomicReferenceValue.class);
 
                 return null;
             }
@@ -580,15 +534,12 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        return getAtomic(new IgniteOutClosureX<IgniteAtomicStamped>() {
-            @Override public IgniteAtomicStamped<T, S> applyx() throws 
IgniteCheckedException {
+        return getAtomic(new IgniteClosureX<IgniteInternalTx, 
IgniteAtomicStamped>() {
+            @Override public IgniteAtomicStamped<T, S> applyx(IgniteInternalTx 
tx) throws IgniteCheckedException {
                 GridCacheInternalKeyImpl key = new 
GridCacheInternalKeyImpl(name);
 
-                dsCacheCtx.gate().enter();
-
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheAtomicStampedValue val = cast(dsView.get(key),
-                        GridCacheAtomicStampedValue.class);
+                try {
+                    GridCacheAtomicStampedValue val = cast(dsView.get(key), 
GridCacheAtomicStampedValue.class);
 
                     // Check that atomic stamped hasn't been created in other 
thread yet.
                     GridCacheAtomicStampedEx stmp = cast(dsMap.get(key),
@@ -613,8 +564,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     dsMap.put(key, stmp);
 
-                    tx.commit();
-
                     return stmp;
                 }
                 catch (Error | Exception e) {
@@ -624,9 +573,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     throw e;
                 }
-                finally {
-                    dsCacheCtx.gate().leave();
-                }
             }
         }, new DataStructureInfo(name, ATOMIC_STAMPED, null), create, 
IgniteAtomicStamped.class);
     }
@@ -641,21 +587,11 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         assert name != null;
         assert dsCacheCtx != null;
 
-        removeDataStructure(new IgniteCallable<Void>() {
-            @Override public Void call() throws Exception {
-                dsCacheCtx.gate().enter();
+        removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() {
+            @Override public Void applyx(IgniteInternalTx tx) throws 
IgniteCheckedException {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
-                try {
-                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
-                    removeInternal(key, GridCacheAtomicStampedValue.class);
-                }
-                catch (Exception e) {
-                    throw new IgniteCheckedException("Failed to remove atomic 
stamped by name: " + name, e);
-                }
-                finally {
-                    dsCacheCtx.gate().leave();
-                }
+                removeInternal(tx, key, GridCacheAtomicStampedValue.class);
 
                 return null;
             }
@@ -712,11 +648,12 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         assert name != null;
         assert cctx != null;
 
-        IgniteCallable<GridCacheQueueHeader> rmv = new 
IgniteCallable<GridCacheQueueHeader>() {
-            @Override public GridCacheQueueHeader call() throws Exception {
-                return (GridCacheQueueHeader)retryRemove(cctx.cache(), new 
GridCacheQueueHeaderKey(name));
-            }
-        };
+        IgniteClosureX<IgniteInternalTx, GridCacheQueueHeader> rmv =
+            new IgniteClosureX<IgniteInternalTx, GridCacheQueueHeader>() {
+                @Override public GridCacheQueueHeader applyx(IgniteInternalTx 
tx) throws IgniteCheckedException {
+                    return (GridCacheQueueHeader)cctx.cache().remove(new 
GridCacheQueueHeaderKey(name), null);
+                }
+            };
 
         CIX1<GridCacheQueueHeader> afterRmv = new CIX1<GridCacheQueueHeader>() 
{
             @Override public void applyx(GridCacheQueueHeader hdr) throws 
IgniteCheckedException {
@@ -748,53 +685,56 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         boolean create)
         throws IgniteCheckedException
     {
-        Map<String, DataStructureInfo> dsMap = 
utilityCache.get(DATA_STRUCTURES_KEY);
+        dsCacheCtx.gate().enter();
 
-        if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
-            return null;
+        try {
+            Map<String, DataStructureInfo> dsMap = 
dsInfoView.get(DATA_STRUCTURES_KEY);
 
-        IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, 
create);
+            if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
+                return null;
 
-        if (err != null)
-            throw err;
+            IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, 
create);
 
-        if (!create) {
-            DataStructureInfo oldInfo = dsMap.get(dsInfo.name);
+            if (err != null)
+                throw err;
 
-            assert oldInfo.info instanceof CollectionInfo : oldInfo.info;
+            if (!create) {
+                DataStructureInfo oldInfo = dsMap.get(dsInfo.name);
 
-            String cacheName = ((CollectionInfo)oldInfo.info).cacheName;
+                assert oldInfo.info instanceof CollectionInfo : oldInfo.info;
 
-            GridCacheContext cacheCtx = 
ctx.cache().internalCache(cacheName).context();
+                String cacheName = ((CollectionInfo)oldInfo.info).cacheName;
 
-            return c.applyx(cacheCtx);
-        }
+                GridCacheContext cacheCtx = 
ctx.cache().internalCache(cacheName).context();
 
-        T col;
+                return c.applyx(cacheCtx);
+            }
 
-        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
-            T2<String, IgniteCheckedException> res =
-                utilityCache.invoke(DATA_STRUCTURES_KEY, new 
AddCollectionProcessor(dsInfo)).get();
+            T col;
 
-            err = res.get2();
+            try (IgniteInternalTx tx = dsInfoView.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
+                T2<String, IgniteCheckedException> res =
+                    dsInfoView.invoke(DATA_STRUCTURES_KEY, new 
AddCollectionProcessor(dsInfo)).get();
 
-            if (err != null)
-                throw err;
+                err = res.get2();
 
-            String cacheName = res.get1();
+                if (err != null)
+                    throw err;
 
-            final GridCacheContext cacheCtx = 
ctx.cache().internalCache(cacheName).context();
+                String cacheName = res.get1();
 
-            col = ctx.closure().callLocalSafe(new Callable<T>() {
-                @Override public T call() throws Exception {
-                    return c.applyx(cacheCtx);
-                }
-            }, false).get();
+                final GridCacheContext cacheCtx = 
ctx.cache().internalCache(cacheName).context();
 
-            tx.commit();
-        }
+                col = c.applyx(cacheCtx);
+
+                tx.commit();
+            }
 
-        return col;
+            return col;
+        }
+        finally {
+            dsCacheCtx.gate().leave();
+        }
     }
 
     /**
@@ -846,15 +786,12 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        return getAtomic(new IgniteOutClosureX<IgniteCountDownLatch>() {
-            @Override public IgniteCountDownLatch applyx() throws 
IgniteCheckedException {
+        return getAtomic(new IgniteClosureX<IgniteInternalTx, 
IgniteCountDownLatch>() {
+            @Override public IgniteCountDownLatch applyx(IgniteInternalTx tx) 
throws IgniteCheckedException {
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
 
-                dsCacheCtx.gate().enter();
-
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheCountDownLatchValue val = cast(dsView.get(key),
-                        GridCacheCountDownLatchValue.class);
+                try {
+                    GridCacheCountDownLatchValue val = cast(dsView.get(key), 
GridCacheCountDownLatchValue.class);
 
                     // Check that count down hasn't been created in other 
thread yet.
                     GridCacheCountDownLatchEx latch = cast(dsMap.get(key), 
GridCacheCountDownLatchEx.class);
@@ -879,8 +816,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     dsMap.put(key, latch);
 
-                    tx.commit();
-
                     return latch;
                 }
                 catch (Error | Exception e) {
@@ -890,9 +825,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
 
                     throw e;
                 }
-                finally {
-                    dsCacheCtx.gate().leave();
-                }
             }
         }, new DataStructureInfo(name, COUNT_DOWN_LATCH, null), create, 
GridCacheCountDownLatchEx.class);
     }
@@ -907,38 +839,25 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         assert name != null;
         assert dsCacheCtx != null;
 
-        removeDataStructure(new IgniteCallable<Void>() {
-            @Override
-            public Void call() throws Exception {
+        removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() {
+            @Override public Void applyx(IgniteInternalTx tx) throws 
IgniteCheckedException {
                 GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
-                dsCacheCtx.gate().enter();
-
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                    // Check correctness type of removable object.
-                    GridCacheCountDownLatchValue val =
-                            cast(dsView.get(key), 
GridCacheCountDownLatchValue.class);
-
-                    if (val != null) {
-                        if (val.get() > 0) {
-                            throw new IgniteCheckedException("Failed to remove 
count down latch " +
-                                    "with non-zero count: " + val.get());
-                        }
-
-                        dsView.removex(key);
-
-                        tx.commit();
-                    } else
-                        tx.setRollbackOnly();
+                // Check correctness type of removable object.
+                GridCacheCountDownLatchValue val =
+                    cast(dsView.get(key), GridCacheCountDownLatchValue.class);
 
-                    return null;
-                } catch (Error | Exception e) {
-                    U.error(log, "Failed to remove data structure: " + key, e);
+                if (val != null) {
+                    if (val.get() > 0)
+                        throw new IgniteCheckedException("Failed to remove 
count down latch " +
+                            "with non-zero count: " + val.get());
 
-                    throw e;
-                } finally {
-                    dsCacheCtx.gate().leave();
+                    dsView.removex(key);
                 }
+                else
+                    tx.setRollbackOnly();
+
+                return null;
             }
         }, name, COUNT_DOWN_LATCH, null);
     }
@@ -946,38 +865,27 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
     /**
      * Remove internal entry by key from cache.
      *
+     * @param tx Transaction for internal atomics cache.
      * @param key Internal entry key.
      * @param cls Class of object which will be removed. If cached object has 
different type exception will be thrown.
      * @return Method returns true if sequence has been removed and false if 
it's not cached.
      * @throws IgniteCheckedException If removing failed or class of object is 
different to expected class.
      */
-    private <R> boolean removeInternal(final GridCacheInternal key, final 
Class<R> cls) throws IgniteCheckedException {
-        return CU.outTx(
-            new Callable<Boolean>() {
-                @Override public Boolean call() throws Exception {
-                    try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                        // Check correctness type of removable object.
-                        R val = cast(dsView.get(key), cls);
-
-                        if (val != null) {
-                            dsView.removex(key);
-
-                            tx.commit();
-                        }
-                        else
-                            tx.setRollbackOnly();
+    private <R> boolean removeInternal(
+        IgniteInternalTx tx,
+        GridCacheInternal key,
+        final Class<R> cls)
+        throws IgniteCheckedException
+    {
+        // Check correctness type of removable object.
+        R val = cast(dsView.get(key), cls);
 
-                        return val != null;
-                    }
-                    catch (Error | Exception e) {
-                        U.error(log, "Failed to remove data structure: " + 
key, e);
+        if (val != null)
+            dsView.removex(key);
+        else
+            tx.setRollbackOnly();
 
-                        throw e;
-                    }
-                }
-            },
-            dsCacheCtx
-        );
+        return val != null;
     }
 
     /**
@@ -1084,11 +992,12 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
         assert name != null;
         assert cctx != null;
 
-        IgniteCallable<GridCacheSetHeader> rmv = new 
IgniteCallable<GridCacheSetHeader>() {
-            @Override public GridCacheSetHeader call() throws Exception {
-                return (GridCacheSetHeader)retryRemove(cctx.cache(), new 
GridCacheSetHeaderKey(name));
-            }
-        };
+        IgniteClosureX<IgniteInternalTx, GridCacheSetHeader> rmv =
+            new IgniteClosureX<IgniteInternalTx, GridCacheSetHeader>() {
+                @Override public GridCacheSetHeader applyx(IgniteInternalTx 
tx) throws IgniteCheckedException {
+                    return (GridCacheSetHeader)cctx.cache().remove(new 
GridCacheSetHeaderKey(name), null);
+                }
+            };
 
         CIX1<GridCacheSetHeader> afterRmv = new CIX1<GridCacheSetHeader>() {
             @Override public void applyx(GridCacheSetHeader hdr) throws 
IgniteCheckedException {
@@ -1100,21 +1009,6 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param cache Cache.
-     * @param key Key to remove.
-     * @throws IgniteCheckedException If failed.
-     * @return Removed value.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable private <T> T retryRemove(final GridCache cache, final Object 
key) throws IgniteCheckedException {
-        return retry(log, new Callable<T>() {
-            @Nullable @Override public T call() throws Exception {
-                return (T)cache.remove(key);
-            }
-        });
-    }
-
-    /**
      * @param log Logger.
      * @param call Callable.
      * @return Callable result.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/821c016b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
index 99beb0f..497b6b0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
@@ -62,9 +62,8 @@ public class IgniteCacheDataStructuresSelfTestSuite extends 
TestSuite {
         suite.addTest(new 
TestSuite(GridCachePartitionedAtomicQueueApiSelfTest.class));
         suite.addTest(new 
TestSuite(GridCachePartitionedQueueMultiNodeSelfTest.class));
         suite.addTest(new 
TestSuite(GridCachePartitionedAtomicQueueMultiNodeSelfTest.class));
-        // TODO: IGNITE-80.
-        //suite.addTest(new 
TestSuite(GridCachePartitionedQueueCreateMultiNodeSelfTest.class));
-        //suite.addTest(new 
TestSuite(GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.class));
+        suite.addTest(new 
TestSuite(GridCachePartitionedQueueCreateMultiNodeSelfTest.class));
+        suite.addTest(new 
TestSuite(GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedSetSelfTest.class));
         suite.addTest(new 
TestSuite(IgnitePartitionedSetNoBackupsSelfTest.class));
         suite.addTest(new 
TestSuite(GridCachePartitionedAtomicSetSelfTest.class));

Reply via email to