Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-6 368dd6375 -> 9e8939828


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java
index 9704fad..951e42d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java
@@ -22,8 +22,12 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.transactions.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -86,7 +90,7 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
     private final IgniteAtomicConfiguration atomicCfg;
 
     /** */
-    private GridCache<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>> utilityCache;
+    private GridCacheProjectionEx<CacheDataStructuresConfigurationKey, 
Map<String, DataStructureInfo>> utilityCache;
 
     /**
      * @param ctx Context.
@@ -105,7 +109,7 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
-        utilityCache = ctx.cache().utilityCache();
+        utilityCache = (GridCacheProjectionEx)ctx.cache().utilityCache();
 
         assert utilityCache != null;
 
@@ -153,119 +157,119 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        dsCacheCtx.gate().enter();
+        final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
 
-        try {
-            final GridCacheInternalKey key = new 
GridCacheInternalKeyImpl(name);
+        // Check type of structure received by key from local cache.
+        IgniteAtomicSequence val = cast(dsMap.get(key), 
IgniteAtomicSequence.class);
 
-            // Check type of structure received by key from local cache.
-            IgniteAtomicSequence val = cast(dsMap.get(key), 
IgniteAtomicSequence.class);
+        if (val != null)
+            return val;
 
-            if (val != null)
-                return val;
+        return getAtomic(new Callable<IgniteAtomicSequence>() {
+            @Override public IgniteAtomicSequence call() throws Exception {
+                dsCacheCtx.gate().enter();
 
-            return CU.outTx(new Callable<IgniteAtomicSequence>() {
-                @Override public IgniteAtomicSequence call() throws Exception {
-                    try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                        GridCacheAtomicSequenceValue seqVal = 
cast(dsView.get(key), GridCacheAtomicSequenceValue.class);
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheAtomicSequenceValue seqVal = 
cast(dsView.get(key), GridCacheAtomicSequenceValue.class);
 
-                        // Check that sequence hasn't been created in other 
thread yet.
-                        GridCacheAtomicSequenceEx seq = cast(dsMap.get(key), 
GridCacheAtomicSequenceEx.class);
+                    // Check that sequence hasn't been created in other thread 
yet.
+                    GridCacheAtomicSequenceEx seq = cast(dsMap.get(key), 
GridCacheAtomicSequenceEx.class);
 
-                        if (seq != null) {
-                            assert seqVal != null;
+                    if (seq != null) {
+                        assert seqVal != null;
 
-                            return seq;
-                        }
+                        return seq;
+                    }
 
-                        if (seqVal == null && !create)
-                            return null;
+                    if (seqVal == null && !create)
+                        return null;
 
-                        // We should use offset because we already reserved 
left side of range.
-                        long off = atomicCfg.getAtomicSequenceReserveSize() > 
1 ?
-                            atomicCfg.getAtomicSequenceReserveSize() - 1 : 1;
+                    // We should use offset because we already reserved left 
side of range.
+                    long off = atomicCfg.getAtomicSequenceReserveSize() > 1 ?
+                        atomicCfg.getAtomicSequenceReserveSize() - 1 : 1;
 
-                        long upBound;
-                        long locCntr;
+                    long upBound;
+                    long locCntr;
 
-                        if (seqVal == null) {
-                            locCntr = initVal;
+                    if (seqVal == null) {
+                        locCntr = initVal;
 
-                            upBound = locCntr + off;
+                        upBound = locCntr + off;
 
-                            // Global counter must be more than reserved 
region.
-                            seqVal = new GridCacheAtomicSequenceValue(upBound 
+ 1);
-                        }
-                        else {
-                            locCntr = seqVal.get();
+                        // Global counter must be more than reserved region.
+                        seqVal = new GridCacheAtomicSequenceValue(upBound + 1);
+                    }
+                    else {
+                        locCntr = seqVal.get();
 
-                            upBound = locCntr + off;
+                        upBound = locCntr + off;
 
-                            // Global counter must be more than reserved 
region.
-                            seqVal.set(upBound + 1);
-                        }
+                        // Global counter must be more than reserved region.
+                        seqVal.set(upBound + 1);
+                    }
 
-                        // Update global counter.
-                        dsView.putx(key, seqVal);
+                    // Update global counter.
+                    dsView.putx(key, seqVal);
 
-                        // Only one thread can be in the transaction scope and 
create sequence.
-                        seq = new GridCacheAtomicSequenceImpl(name,
-                            key,
-                            seqView,
-                            dsCacheCtx,
-                            atomicCfg.getAtomicSequenceReserveSize(),
-                            locCntr,
-                            upBound);
+                    // Only one thread can be in the transaction scope and 
create sequence.
+                    seq = new GridCacheAtomicSequenceImpl(name,
+                        key,
+                        seqView,
+                        dsCacheCtx,
+                        atomicCfg.getAtomicSequenceReserveSize(),
+                        locCntr,
+                        upBound);
 
-                        dsMap.put(key, seq);
+                    dsMap.put(key, seq);
 
-                        tx.commit();
+                    tx.commit();
 
-                        return seq;
-                    }
-                    catch (Error | Exception e) {
-                        dsMap.remove(key);
+                    return seq;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
 
-                        U.error(log, "Failed to make atomic sequence: " + 
name, e);
+                    U.error(log, "Failed to make atomic sequence: " + name, e);
 
-                        throw e;
-                    }
+                    throw e;
                 }
-            }, dsCacheCtx);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to get sequence by name: 
" + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
-        }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, DataStructureType.ATOMIC_SEQ, null), 
create);
     }
 
     /**
      * Removes sequence from cache.
      *
      * @param name Sequence name.
-     * @return Method returns {@code true} if sequence has been removed and 
{@code false} if it's not cached.
      * @throws IgniteCheckedException If removing failed.
      */
-    public final boolean removeSequence(String name) throws 
IgniteCheckedException {
+    public final void removeSequence(final String name) throws 
IgniteCheckedException {
         assert name != null;
 
         checkAtomicsConfiguration();
 
-        dsCacheCtx.gate().enter();
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override public Void call() throws Exception {
+                dsCacheCtx.gate().enter();
 
-        try {
-            GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+                try {
+                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
-            return removeInternal(key, GridCacheAtomicSequenceValue.class);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to remove sequence by 
name: " + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
-        }
+                    removeInternal(key, GridCacheAtomicSequenceValue.class);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException("Failed to remove 
sequence by name: " + name, e);
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+
+                return null;
+            }
+        }, name, DataStructureType.ATOMIC_SEQ, null);
     }
 
     /**
@@ -284,64 +288,111 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        dsCacheCtx.gate().enter();
+        final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
 
-        try {
-            final GridCacheInternalKey key = new 
GridCacheInternalKeyImpl(name);
+        // Check type of structure received by key from local cache.
+        IgniteAtomicLong atomicLong = cast(dsMap.get(key), 
IgniteAtomicLong.class);
 
-            // Check type of structure received by key from local cache.
-            IgniteAtomicLong atomicLong = cast(dsMap.get(key), 
IgniteAtomicLong.class);
+        if (atomicLong != null)
+            return atomicLong;
 
-            if (atomicLong != null)
-                return atomicLong;
+        return getAtomic(new Callable<IgniteAtomicLong>() {
+            @Override public IgniteAtomicLong call() throws Exception {
+                dsCacheCtx.gate().enter();
 
-            return CU.outTx(new Callable<IgniteAtomicLong>() {
-                @Override public IgniteAtomicLong call() throws Exception {
-                    try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                        GridCacheAtomicLongValue val = cast(dsView.get(key), 
GridCacheAtomicLongValue.class);
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheAtomicLongValue val = cast(dsView.get(key), 
GridCacheAtomicLongValue.class);
 
-                        // Check that atomic long hasn't been created in other 
thread yet.
-                        GridCacheAtomicLongEx a = cast(dsMap.get(key), 
GridCacheAtomicLongEx.class);
+                    // Check that atomic long hasn't been created in other 
thread yet.
+                    GridCacheAtomicLongEx a = cast(dsMap.get(key), 
GridCacheAtomicLongEx.class);
 
-                        if (a != null) {
-                            assert val != null;
+                    if (a != null) {
+                        assert val != null;
 
-                            return a;
-                        }
+                        return a;
+                    }
 
-                        if (val == null && !create)
-                            return null;
+                    if (val == null && !create)
+                        return null;
 
-                        if (val == null) {
-                            val = new GridCacheAtomicLongValue(initVal);
+                    if (val == null) {
+                        val = new GridCacheAtomicLongValue(initVal);
 
-                            dsView.putx(key, val);
-                        }
+                        dsView.putx(key, val);
+                    }
 
-                        a = new GridCacheAtomicLongImpl(name, key, 
atomicLongView, dsCacheCtx);
+                    a = new GridCacheAtomicLongImpl(name, key, atomicLongView, 
dsCacheCtx);
 
-                        dsMap.put(key, a);
+                    dsMap.put(key, a);
 
-                        tx.commit();
+                    tx.commit();
 
-                        return a;
-                    }
-                    catch (Error | Exception e) {
-                        dsMap.remove(key);
+                    return a;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
 
-                        U.error(log, "Failed to make atomic long: " + name, e);
+                    U.error(log, "Failed to make atomic long: " + name, e);
 
-                        throw e;
-                    }
+                    throw e;
                 }
-            }, dsCacheCtx);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to get atomic long by 
name: " + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, DataStructureType.ATOMIC_LONG, null), 
create);
+    }
+
+    /**
+     * @param c Closure creating data structure instance.
+     * @param dsInfo Data structure info.
+     * @param create Create flag.
+     * @return Data structure instance.
+     * @throws IgniteCheckedException
+     */
+    @Nullable private <T> T getAtomic(Callable<T> c,
+        DataStructureInfo dsInfo,
+        boolean create)
+        throws IgniteCheckedException
+    {
+        Map<String, DataStructureInfo> dsMap = 
utilityCache.get(DATA_STRUCTURES_KEY);
+
+        if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
+            return null;
+
+        IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, 
create);
+
+        if (err != null)
+            throw err;
+
+        T dataStructure;
+
+        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
+            if (create) {
+                err =  utilityCache.invoke(DATA_STRUCTURES_KEY, new 
AddAtomicProcessor(dsInfo)).get();
+
+                if (err != null)
+                    throw err;
+            }
+            else {
+                T2<Boolean, IgniteCheckedException> res =
+                        utilityCache.invoke(DATA_STRUCTURES_KEY, new 
ContainsAtomicProcessor(dsInfo)).get();
+
+                err = res.get2();
+
+                if (err != null)
+                    throw err;
+
+                if (!res.get1())
+                    return null;
+            }
+
+            dataStructure = ctx.closure().callLocalSafe(c, false).get();
+
+            tx.commit();
         }
+
+        return dataStructure;
     }
 
     /**
@@ -350,23 +401,77 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
      * @param name Atomic long name.
      * @throws IgniteCheckedException If removing failed.
      */
-    public final void removeAtomicLong(String name) throws 
IgniteCheckedException {
+    public final void removeAtomicLong(final String name) throws 
IgniteCheckedException {
         assert name != null;
         assert dsCacheCtx != null;
 
-        dsCacheCtx.gate().enter();
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override public Void call() throws Exception {
+                dsCacheCtx.gate().enter();
 
-        try {
-            GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+                try {
+                    removeInternal(new GridCacheInternalKeyImpl(name), 
GridCacheAtomicLongValue.class);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException("Failed to remove atomic 
long by name: " + name, e);
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
 
-            removeInternal(key, GridCacheAtomicLongValue.class);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to remove atomic long by 
name: " + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
+                return null;
+            }
+        }, name, DataStructureType.ATOMIC_LONG, null);
+    }
+
+    /**
+     * @param c Closure.
+     * @param name Data structure name.
+     * @param type Data structure type.
+     * @throws IgniteCheckedException If failed.
+     */
+    private <T> void removeDataStructure(IgniteCallable<T> c,
+        String name,
+        DataStructureType type,
+        @Nullable IgniteInClosureX<T> afterRmv)
+        throws IgniteCheckedException
+    {
+        Map<String, DataStructureInfo> dsMap = 
utilityCache.get(DATA_STRUCTURES_KEY);
+
+        if (dsMap == null || !dsMap.containsKey(name))
+            return;
+
+        DataStructureInfo dsInfo = new DataStructureInfo(name, type, null);
+
+        IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, 
false);
+
+        if (err != null)
+            throw err;
+
+        T rmvInfo;
+
+        try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
+            T2<Boolean, IgniteCheckedException> res =
+                utilityCache.invoke(DATA_STRUCTURES_KEY, new 
RemoveDataStructureProcessor(dsInfo)).get();
+
+            err = res.get2();
+
+            if (err != null)
+                throw err;
+
+            if (!res.get1()) {
+                tx.commit();
+
+                return;
+            }
+
+            rmvInfo = ctx.closure().callLocalSafe(c, false).get();
+
+            tx.commit();
         }
+
+        if (afterRmv != null && rmvInfo != null)
+            afterRmv.applyx(rmvInfo);
     }
 
     /**
@@ -389,66 +494,61 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        dsCacheCtx.gate().enter();
+        final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
 
-        try {
-            final GridCacheInternalKey key = new 
GridCacheInternalKeyImpl(name);
+        // Check type of structure received by key from local cache.
+        IgniteAtomicReference atomicRef = cast(dsMap.get(key), 
IgniteAtomicReference.class);
 
-            // Check type of structure received by key from local cache.
-            IgniteAtomicReference atomicRef = cast(dsMap.get(key), 
IgniteAtomicReference.class);
+        if (atomicRef != null)
+            return atomicRef;
 
-            if (atomicRef != null)
-                return atomicRef;
+        return getAtomic(new Callable<IgniteAtomicReference<T>>() {
+            @Override public IgniteAtomicReference<T> call() throws Exception {
+                dsCacheCtx.gate().enter();
 
-            return CU.outTx(new Callable<IgniteAtomicReference<T>>() {
-                @Override public IgniteAtomicReference<T> call() throws 
Exception {
-                    try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                        GridCacheAtomicReferenceValue val = 
cast(dsView.get(key),
-                            GridCacheAtomicReferenceValue.class);
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheAtomicReferenceValue val = cast(dsView.get(key),
+                        GridCacheAtomicReferenceValue.class);
 
-                        // Check that atomic reference hasn't been created in 
other thread yet.
-                        GridCacheAtomicReferenceEx ref = cast(dsMap.get(key),
-                            GridCacheAtomicReferenceEx.class);
+                    // Check that atomic reference hasn't been created in 
other thread yet.
+                    GridCacheAtomicReferenceEx ref = cast(dsMap.get(key),
+                        GridCacheAtomicReferenceEx.class);
 
-                        if (ref != null) {
-                            assert val != null;
+                    if (ref != null) {
+                        assert val != null;
 
-                            return ref;
-                        }
+                        return ref;
+                    }
 
-                        if (val == null && !create)
-                            return null;
+                    if (val == null && !create)
+                        return null;
 
-                        if (val == null) {
-                            val = new GridCacheAtomicReferenceValue(initVal);
+                    if (val == null) {
+                        val = new GridCacheAtomicReferenceValue(initVal);
 
-                            dsView.putx(key, val);
-                        }
+                        dsView.putx(key, val);
+                    }
 
-                        ref = new GridCacheAtomicReferenceImpl(name, key, 
atomicRefView, dsCacheCtx);
+                    ref = new GridCacheAtomicReferenceImpl(name, key, 
atomicRefView, dsCacheCtx);
 
-                        dsMap.put(key, ref);
+                    dsMap.put(key, ref);
 
-                        tx.commit();
+                    tx.commit();
 
-                        return ref;
-                    }
-                    catch (Error | Exception e) {
-                        dsMap.remove(key);
+                    return ref;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
 
-                        U.error(log, "Failed to make atomic reference: " + 
name, e);
+                    U.error(log, "Failed to make atomic reference: " + name, 
e);
 
-                        throw e;
-                    }
+                    throw e;
                 }
-            }, dsCacheCtx);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to get atomic reference 
by name: " + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
-        }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, DataStructureType.ATOMIC_REF, null), 
create);
     }
 
     /**
@@ -457,23 +557,29 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
      * @param name Atomic reference name.
      * @throws IgniteCheckedException If removing failed.
      */
-    public final void removeAtomicReference(String name) throws 
IgniteCheckedException {
+    public final void removeAtomicReference(final String name) throws 
IgniteCheckedException {
         assert name != null;
         assert dsCacheCtx != null;
 
-        dsCacheCtx.gate().enter();
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override public Void call() throws Exception {
+                dsCacheCtx.gate().enter();
 
-        try {
-            GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+                try {
+                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
-            removeInternal(key, GridCacheAtomicReferenceValue.class);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to remove atomic 
reference by name: " + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
-        }
+                    removeInternal(key, GridCacheAtomicReferenceValue.class);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException("Failed to remove atomic 
reference by name: " + name, e);
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+
+                return null;
+            }
+        }, name, DataStructureType.ATOMIC_REF, null);
     }
 
     /**
@@ -495,66 +601,61 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        dsCacheCtx.gate().enter();
+        final GridCacheInternalKeyImpl key = new 
GridCacheInternalKeyImpl(name);
 
-        try {
-            final GridCacheInternalKeyImpl key = new 
GridCacheInternalKeyImpl(name);
+        // Check type of structure received by key from local cache.
+        IgniteAtomicStamped atomicStamped = cast(dsMap.get(key), 
IgniteAtomicStamped.class);
 
-            // Check type of structure received by key from local cache.
-            IgniteAtomicStamped atomicStamped = cast(dsMap.get(key), 
IgniteAtomicStamped.class);
+        if (atomicStamped != null)
+            return atomicStamped;
 
-            if (atomicStamped != null)
-                return atomicStamped;
+        return getAtomic(new Callable<IgniteAtomicStamped<T, S>>() {
+            @Override public IgniteAtomicStamped<T, S> call() throws Exception 
{
+                dsCacheCtx.gate().enter();
 
-            return CU.outTx(new Callable<IgniteAtomicStamped<T, S>>() {
-                @Override public IgniteAtomicStamped<T, S> call() throws 
Exception {
-                    try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                        GridCacheAtomicStampedValue val = cast(dsView.get(key),
-                            GridCacheAtomicStampedValue.class);
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheAtomicStampedValue val = cast(dsView.get(key),
+                        GridCacheAtomicStampedValue.class);
 
-                        // Check that atomic stamped hasn't been created in 
other thread yet.
-                        GridCacheAtomicStampedEx stmp = cast(dsMap.get(key),
-                            GridCacheAtomicStampedEx.class);
+                    // Check that atomic stamped hasn't been created in other 
thread yet.
+                    GridCacheAtomicStampedEx stmp = cast(dsMap.get(key),
+                        GridCacheAtomicStampedEx.class);
 
-                        if (stmp != null) {
-                            assert val != null;
+                    if (stmp != null) {
+                        assert val != null;
 
-                            return stmp;
-                        }
+                        return stmp;
+                    }
 
-                        if (val == null && !create)
-                            return null;
+                    if (val == null && !create)
+                        return null;
 
-                        if (val == null) {
-                            val = new GridCacheAtomicStampedValue(initVal, 
initStamp);
+                    if (val == null) {
+                        val = new GridCacheAtomicStampedValue(initVal, 
initStamp);
 
-                            dsView.putx(key, val);
-                        }
+                        dsView.putx(key, val);
+                    }
 
-                        stmp = new GridCacheAtomicStampedImpl(name, key, 
atomicStampedView, dsCacheCtx);
+                    stmp = new GridCacheAtomicStampedImpl(name, key, 
atomicStampedView, dsCacheCtx);
 
-                        dsMap.put(key, stmp);
+                    dsMap.put(key, stmp);
 
-                        tx.commit();
+                    tx.commit();
 
-                        return stmp;
-                    }
-                    catch (Error | Exception e) {
-                        dsMap.remove(key);
+                    return stmp;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
 
-                        U.error(log, "Failed to make atomic stamped: " + name, 
e);
+                    U.error(log, "Failed to make atomic stamped: " + name, e);
 
-                        throw e;
-                    }
+                    throw e;
                 }
-            }, dsCacheCtx);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to get atomic stamped by 
name: " + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
-        }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, DataStructureType.ATOMIC_STAMPED, 
null), create);
     }
 
     /**
@@ -563,23 +664,29 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
      * @param name Atomic stamped name.
      * @throws IgniteCheckedException If removing failed.
      */
-    public final void removeAtomicStamped(String name) throws 
IgniteCheckedException {
+    public final void removeAtomicStamped(final String name) throws 
IgniteCheckedException {
         assert name != null;
         assert dsCacheCtx != null;
 
-        dsCacheCtx.gate().enter();
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override public Void call() throws Exception {
+                dsCacheCtx.gate().enter();
 
-        try {
-            GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+                try {
+                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
-            removeInternal(key, GridCacheAtomicStampedValue.class);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to remove atomic stamped 
by name: " + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
-        }
+                    removeInternal(key, GridCacheAtomicStampedValue.class);
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException("Failed to remove atomic 
stamped by name: " + name, e);
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+
+                return null;
+            }
+        }, name, DataStructureType.ATOMIC_STAMPED, null);
     }
 
     /**
@@ -594,9 +701,9 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
      */
     @SuppressWarnings("unchecked")
     public final <T> IgniteQueue<T> queue(final String name,
-        @Nullable IgniteCollectionConfiguration cfg,
+        @Nullable final IgniteCollectionConfiguration cfg,
         int cap,
-        boolean create)
+        final boolean create)
         throws IgniteCheckedException {
         A.notNull(name, "name");
 
@@ -610,48 +717,117 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
                 throw new IgniteCheckedException("Cache for collection is not 
configured: " + cfg.getCacheName());
         }
 
+        DataStructureInfo dsInfo = new DataStructureInfo(name,
+            DataStructureType.QUEUE,
+            create ? new QueueInfo(cfg.getCacheName(), cfg.isCollocated(), 
cap) : null);
+
+        final int cap0 = cap;
+
+        return getCollection(new IgniteClosureX<GridCacheContext, 
IgniteQueue<T>>() {
+            @Override public IgniteQueue<T> applyx(GridCacheContext ctx) 
throws IgniteCheckedException {
+                return ctx.dataStructures().queue(name, cap0, create && 
cfg.isCollocated(), create);
+            }
+        }, dsInfo, create);
+    }
+
+    /**
+     * @param name Queue name.
+     * @param cctx Queue cache context.
+     * @throws IgniteCheckedException
+     */
+    public void removeQueue(final String name, final GridCacheContext cctx) 
throws IgniteCheckedException {
+        assert name != null;
+        assert cctx != null;
+
+        IgniteCallable<GridCacheQueueHeader> rmv = new 
IgniteCallable<GridCacheQueueHeader>() {
+            @Override public GridCacheQueueHeader call() throws Exception {
+                return (GridCacheQueueHeader)retryRemove(cctx.cache(), new 
GridCacheQueueHeaderKey(name));
+            }
+        };
+
+        CIX1<GridCacheQueueHeader> afterRmv = new CIX1<GridCacheQueueHeader>() 
{
+            @Override public void applyx(GridCacheQueueHeader hdr) throws 
IgniteCheckedException {
+                if (hdr.empty())
+                    return;
+
+                GridCacheQueueAdapter.removeKeys(cctx.cache(),
+                    hdr.id(),
+                    name,
+                    hdr.collocated(),
+                    hdr.head(),
+                    hdr.tail(),
+                    0);
+            }
+        };
+
+        removeDataStructure(rmv, name, DataStructureType.QUEUE, afterRmv);
+    }
+
+    /**
+     * @param c Closure creating collection.
+     * @param dsInfo Data structure info.
+     * @param create Create flag.
+     * @return Collection instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private <T> T getCollection(final 
IgniteClosureX<GridCacheContext, T> c,
+        DataStructureInfo dsInfo,
+        boolean create)
+        throws IgniteCheckedException
+    {
         Map<String, DataStructureInfo> dsMap = 
utilityCache.get(DATA_STRUCTURES_KEY);
 
-        if (!create && (dsMap == null || !dsMap.containsKey(name)))
+        if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name)))
             return null;
 
-        DataStructureInfo dsInfo = new DataStructureInfo(name,
-            DataStructureType.QUEUE,
-            create ? new QueueInfo(cfg.isCollocated(), cap, 
cfg.getCacheName()) : null);
-
-        IgniteCheckedException err = 
validateDataStructure(utilityCache.get(DATA_STRUCTURES_KEY), dsInfo, create);
+        IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, 
create);
 
         if (err != null)
             throw err;
 
+        T col;
+
         try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, 
REPEATABLE_READ)) {
-            dsMap = utilityCache.get(DATA_STRUCTURES_KEY);
+            final String cacheName;
 
-            validateDataStructure(dsMap, dsInfo, create);
+            if (create) {
+                T2<String, IgniteCheckedException> res =
+                    utilityCache.invoke(DATA_STRUCTURES_KEY, new 
AddCollectionProcessor(dsInfo)).get();
 
-            String cacheName;
+                err = res.get2();
 
-            if (create) {
-                if (dsMap == null)  {
-                    dsMap = new HashMap<>();
+                if (err != null)
+                    throw err;
 
-                    dsMap.put(name, dsInfo);
-                }
+                cacheName = res.get1();
+            }
+            else {
+                T3<Boolean, String, IgniteCheckedException> res =
+                    utilityCache.invoke(DATA_STRUCTURES_KEY, new 
ContainsCollectionProcessor(dsInfo)).get();
+
+                err = res.get3();
+
+                if (err != null)
+                    throw err;
 
-                DataStructureInfo info = dsMap.get(name);
+                if (!res.get1())
+                    return null;
 
-                if (info == null)
-                    dsMap.put(name, info);
+                cacheName = res.get2();
             }
-            else if (dsMap == null || !dsMap.containsKey(name))
-                return null;
+
+            final GridCacheContext cacheCtx = 
ctx.cache().internalCache(cacheName).context();
+
+            col = ctx.closure().callLocalSafe(new Callable<T>() {
+                @Override public T call() throws Exception {
+                    return c.applyx(cacheCtx);
+                }
+            }, false).get();
 
             tx.commit();
         }
 
-        GridCacheAdapter cache = cacheForCollection(cfg);
-
-        return cache.context().dataStructures().queue(name, cap, create && 
cfg.isCollocated(), create);
+        return col;
     }
 
     /**
@@ -670,7 +846,7 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
         DataStructureInfo oldInfo = dsMap.get(info.name);
 
         if (oldInfo != null)
-            return oldInfo.validateConfiguration(info, create);
+            return oldInfo.validate(info, create);
 
         return null;
     }
@@ -702,66 +878,61 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
-        dsCacheCtx.gate().enter();
+        final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
 
-        try {
-            final GridCacheInternalKey key = new 
GridCacheInternalKeyImpl(name);
+        // Check type of structure received by key from local cache.
+        GridCacheCountDownLatchEx latch = cast(dsMap.get(key), 
GridCacheCountDownLatchEx.class);
 
-            // Check type of structure received by key from local cache.
-            IgniteCountDownLatch latch = cast(dsMap.get(key), 
IgniteCountDownLatch.class);
+        if (latch != null)
+            return latch;
 
-            if (latch != null)
-                return latch;
+        return getAtomic(new Callable<IgniteCountDownLatch>() {
+            @Override public IgniteCountDownLatch call() throws Exception {
+                dsCacheCtx.gate().enter();
 
-            return CU.outTx(new Callable<IgniteCountDownLatch>() {
-                @Override public IgniteCountDownLatch call() throws Exception {
-                    try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                        GridCacheCountDownLatchValue val = 
cast(dsView.get(key),
-                            GridCacheCountDownLatchValue.class);
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheCountDownLatchValue val = cast(dsView.get(key),
+                        GridCacheCountDownLatchValue.class);
 
-                        // Check that count down hasn't been created in other 
thread yet.
-                        GridCacheCountDownLatchEx latch = cast(dsMap.get(key), 
GridCacheCountDownLatchEx.class);
+                    // Check that count down hasn't been created in other 
thread yet.
+                    GridCacheCountDownLatchEx latch = cast(dsMap.get(key), 
GridCacheCountDownLatchEx.class);
 
-                        if (latch != null) {
-                            assert val != null;
+                    if (latch != null) {
+                        assert val != null;
 
-                            return latch;
-                        }
+                        return latch;
+                    }
 
-                        if (val == null && !create)
-                            return null;
+                    if (val == null && !create)
+                        return null;
 
-                        if (val == null) {
-                            val = new GridCacheCountDownLatchValue(cnt, 
autoDel);
+                    if (val == null) {
+                        val = new GridCacheCountDownLatchValue(cnt, autoDel);
 
-                            dsView.putx(key, val);
-                        }
+                        dsView.putx(key, val);
+                    }
 
-                        latch = new GridCacheCountDownLatchImpl(name, 
val.get(), val.initialCount(),
-                            val.autoDelete(), key, cntDownLatchView, 
dsCacheCtx);
+                    latch = new GridCacheCountDownLatchImpl(name, val.get(), 
val.initialCount(),
+                        val.autoDelete(), key, cntDownLatchView, dsCacheCtx);
 
-                        dsMap.put(key, latch);
+                    dsMap.put(key, latch);
 
-                        tx.commit();
+                    tx.commit();
 
-                        return latch;
-                    }
-                    catch (Error | Exception e) {
-                        dsMap.remove(key);
+                    return latch;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
 
-                        U.error(log, "Failed to create count down latch: " + 
name, e);
+                    U.error(log, "Failed to create count down latch: " + name, 
e);
 
-                        throw e;
-                    }
+                    throw e;
                 }
-            }, dsCacheCtx);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to get count down latch 
by name: " + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
-        }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, DataStructureType.COUNT_DOWN_LATCH, 
null), create);
     }
 
     /**
@@ -774,47 +945,40 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
         assert name != null;
         assert dsCacheCtx != null;
 
-        dsCacheCtx.gate().enter();
+        removeDataStructure(new IgniteCallable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
 
-        try {
-            CU.outTx(new Callable<Boolean>() {
-                @Override public Boolean call() throws Exception {
-                    GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+                dsCacheCtx.gate().enter();
 
-                    try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                        // Check correctness type of removable object.
-                        GridCacheCountDownLatchValue val =
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, 
dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    // Check correctness type of removable object.
+                    GridCacheCountDownLatchValue val =
                             cast(dsView.get(key), 
GridCacheCountDownLatchValue.class);
 
-                        if (val != null) {
-                            if (val.get() > 0) {
-                                throw new IgniteCheckedException("Failed to 
remove count down latch " +
+                    if (val != null) {
+                        if (val.get() > 0) {
+                            throw new IgniteCheckedException("Failed to remove 
count down latch " +
                                     "with non-zero count: " + val.get());
-                            }
+                        }
 
-                            dsView.removex(key);
+                        dsView.removex(key);
 
-                            tx.commit();
-                        }
-                        else
-                            tx.setRollbackOnly();
+                        tx.commit();
+                    } else
+                        tx.setRollbackOnly();
 
-                        return val != null;
-                    }
-                    catch (Error | Exception e) {
-                        U.error(log, "Failed to remove data structure: " + 
key, e);
+                    return null;
+                } catch (Error | Exception e) {
+                    U.error(log, "Failed to remove data structure: " + key, e);
 
-                        throw e;
-                    }
+                    throw e;
+                } finally {
+                    dsCacheCtx.gate().leave();
                 }
-            }, dsCacheCtx);
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException("Failed to remove count down 
latch by name: " + name, e);
-        }
-        finally {
-            dsCacheCtx.gate().leave();
-        }
+            }
+        }, name, DataStructureType.COUNT_DOWN_LATCH, null);
     }
 
     /**
@@ -926,17 +1090,66 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
      */
     @SuppressWarnings("unchecked")
     @Nullable public <T> IgniteSet<T> set(final String name,
-        @Nullable IgniteCollectionConfiguration cfg,
+        @Nullable final IgniteCollectionConfiguration cfg,
         final boolean create)
         throws IgniteCheckedException {
         A.notNull(name, "name");
 
-        if (create)
+        if (create) {
             A.notNull(cfg, "cfg");
 
-        GridCacheAdapter cache = cacheForCollection(cfg);
+            if (ctx.cache().publicCache(cfg.getCacheName()) == null)
+                throw new IgniteCheckedException("Cache for collection is not 
configured: " + cfg.getCacheName());
+        }
 
-        return cache.context().dataStructures().set(name, create ? 
cfg.isCollocated() : false, create);
+        DataStructureInfo dsInfo = new DataStructureInfo(name,
+            DataStructureType.SET,
+            create ? new CollectionInfo(cfg.getCacheName(), 
cfg.isCollocated()) : null);
+
+        return getCollection(new CX1<GridCacheContext, IgniteSet<T>>() {
+            @Override public IgniteSet<T> applyx(GridCacheContext cctx) throws 
IgniteCheckedException {
+                return cctx.dataStructures().set(name, create ? 
cfg.isCollocated() : false, create);
+            }
+        }, dsInfo, create);
+    }
+
+    /**
+     * @param name Set name.
+     * @param cctx Set cache context.
+     * @throws IgniteCheckedException
+     */
+    public void removeSet(final String name, final GridCacheContext cctx) 
throws IgniteCheckedException {
+        assert name != null;
+        assert cctx != null;
+
+        IgniteCallable<GridCacheSetHeader> rmv = new 
IgniteCallable<GridCacheSetHeader>() {
+            @Override public GridCacheSetHeader call() throws Exception {
+                return (GridCacheSetHeader)retryRemove(cctx.cache(), new 
GridCacheSetHeaderKey(name));
+            }
+        };
+
+        CIX1<GridCacheSetHeader> afterRmv = new CIX1<GridCacheSetHeader>() {
+            @Override public void applyx(GridCacheSetHeader hdr) throws 
IgniteCheckedException {
+                cctx.dataStructures().removeSetData(hdr.id());
+            }
+        };
+
+        removeDataStructure(rmv, name, DataStructureType.SET, afterRmv);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key to remove.
+     * @throws IgniteCheckedException If failed.
+     * @return Removed value.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable private <T> T retryRemove(final GridCache cache, final Object 
key) throws IgniteCheckedException {
+        return retry(log, new Callable<T>() {
+            @Nullable @Override public T call() throws Exception {
+                return (T)cache.remove(key);
+            }
+        });
     }
 
     /**
@@ -953,10 +1166,10 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
                 try {
                     return call.call();
                 }
-                catch (ClusterGroupEmptyException e) {
-                    throw new IgniteException(e);
+                catch (ClusterGroupEmptyCheckedException e) {
+                    throw new IgniteCheckedException(e);
                 }
-                catch (IgniteTxRollbackException | 
CachePartialUpdateCheckedException | ClusterTopologyException e) {
+                catch (IgniteTxRollbackCheckedException | 
CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) {
                     if (cnt++ == MAX_UPDATE_RETRIES)
                         throw e;
                     else {
@@ -967,11 +1180,11 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
                 }
             }
         }
-        catch (IgniteCheckedException | IgniteException e) {
+        catch (IgniteCheckedException e) {
             throw e;
         }
         catch (Exception e) {
-            throw new IgniteException(e);
+            throw new IgniteCheckedException(e);
         }
     }
 
@@ -1005,22 +1218,6 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param cfg Collection configuration.
-     * @return Cache to use for collection.
-     * @throws IgniteCheckedException If cache is not configured.
-     */
-    private GridCacheAdapter cacheForCollection(IgniteCollectionConfiguration 
cfg) throws IgniteCheckedException {
-        if (ctx.cache().publicCache(cfg.getCacheName()) == null)
-            throw new IgniteCheckedException("Cache for collection is not 
configured: " + cfg.getCacheName());
-
-        GridCacheAdapter cache = ctx.cache().internalCache(cfg.getCacheName());
-
-        assert cache != null : cfg.getCacheName();
-
-        return cache;
-    }
-
-    /**
      * @throws IgniteException If atomics configuration is not provided.
      */
     private void checkAtomicsConfiguration() throws IgniteException {
@@ -1046,6 +1243,9 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
         ATOMIC_STAMPED(IgniteAtomicStamped.class.getSimpleName()),
 
         /** */
+        COUNT_DOWN_LATCH(IgniteCountDownLatch.class.getSimpleName()),
+
+        /** */
         QUEUE(IgniteQueue.class.getSimpleName()),
 
         /** */
@@ -1083,7 +1283,7 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
     /**
      *
      */
-    static class SetInfo implements Externalizable {
+    static class CollectionInfo implements Externalizable {
         /** */
         private boolean collocated;
 
@@ -1093,41 +1293,43 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
         /**
          * Required by {@link Externalizable}.
          */
-        public SetInfo() {
+        public CollectionInfo() {
             // No-op.
         }
 
         /**
          * @param collocated Collocated flag.
          */
-        public SetInfo(boolean collocated) {
+        public CollectionInfo(String cacheName, boolean collocated) {
+            this.cacheName = cacheName;
             this.collocated = collocated;
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
             collocated = in.readBoolean();
+            cacheName = U.readString(in);
         }
 
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
             out.writeBoolean(collocated);
+            U.writeString(out, cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CollectionInfo.class, this);
         }
     }
 
     /**
      *
      */
-    static class QueueInfo implements Externalizable {
-        /** */
-        private boolean collocated;
-
+    static class QueueInfo extends CollectionInfo {
         /** */
         private int cap;
 
-        /** */
-        private String cacheName;
-
         /**
          * Required by {@link Externalizable}.
          */
@@ -1140,24 +1342,29 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
          * @param cap Queue capacity.
          * @param cacheName Cache name.
          */
-        public QueueInfo(boolean collocated, int cap, String cacheName) {
-            this.collocated = collocated;
+        public QueueInfo(String cacheName, boolean collocated, int cap) {
+            super(cacheName, collocated);
+
             this.cap = cap;
-            this.cacheName = cacheName;
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            collocated = in.readBoolean();
+            super.readExternal(in);
+
             cap = in.readInt();
-            cacheName = U.readString(in);
         }
 
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeBoolean(collocated);
+            super.writeExternal(out);
+
             out.writeInt(cap);
-            U.writeString(out, cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueueInfo.class, this, "super", 
super.toString());
         }
     }
 
@@ -1200,42 +1407,33 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
          * @param create Create flag.
          * @return Exception if validation failed.
          */
-        @Nullable IgniteCheckedException 
validateConfiguration(DataStructureInfo dsInfo, boolean create) {
+        @Nullable IgniteCheckedException validate(DataStructureInfo dsInfo, 
boolean create) {
             if (type != dsInfo.type) {
                 return new IgniteCheckedException("Another data structure with 
the same name already created " +
-                    "[name= " + name +
-                    ", new= " + dsInfo.type.className() +
-                    ", existing=" + type.className() + ']');
+                    "[name=" + name +
+                    ", newType=" + dsInfo.type.className() +
+                    ", existingType=" + type.className() + ']');
             }
 
             if (create) {
-                if (type == DataStructureType.QUEUE ) {
-                    QueueInfo oldInfo = (QueueInfo)info;
-                    QueueInfo newInfo = (QueueInfo)dsInfo.info;
+                if (type == DataStructureType.QUEUE || type == 
DataStructureType.SET) {
+                    CollectionInfo oldInfo = (CollectionInfo)info;
+                    CollectionInfo newInfo = (CollectionInfo)dsInfo.info;
 
                     if (oldInfo.collocated != newInfo.collocated) {
-                        return new IgniteCheckedException("Another queue with 
the same name but different " +
-                            "configuration already created [name= " + name +
-                            ", newCollocated= " + newInfo.collocated +
+                        return new IgniteCheckedException("Another collection 
with the same name but different " +
+                            "configuration already created [name=" + name +
+                            ", newCollocated=" + newInfo.collocated +
                             ", existingCollocated=" + newInfo.collocated + 
']');
                     }
 
-                    if (oldInfo.cap != newInfo.cap) {
-                        return new IgniteCheckedException("Another queue with 
the same name but different " +
-                            "configuration already created [name= " + name +
-                            ", newCapacity= " + newInfo.cap+
-                            ", existingCapacity=" + newInfo.cap + ']');
-                    }
-                }
-                else if (type == DataStructureType.SET ) {
-                    SetInfo oldInfo = (SetInfo)info;
-                    SetInfo newInfo = (SetInfo)dsInfo.info;
-
-                    if (oldInfo.collocated != newInfo.collocated) {
-                        return new IgniteCheckedException("Another set with 
the same name but different " +
-                            "configuration already created [name= " + name +
-                            ", newCollocated= " + newInfo.collocated +
-                            ", existingCollocated=" + newInfo.collocated + 
']');
+                    if (type == DataStructureType.QUEUE) {
+                        if (((QueueInfo)oldInfo).cap != 
((QueueInfo)newInfo).cap) {
+                            return new IgniteCheckedException("Another queue 
with the same name but different " +
+                                "configuration already created [name=" + name +
+                                ", newCapacity=" + ((QueueInfo)newInfo).cap +
+                                ", existingCapacity=" + 
((QueueInfo)oldInfo).cap + ']');
+                        }
                     }
                 }
             }
@@ -1256,12 +1454,140 @@ public final class CacheDataStructuresProcessor 
extends GridProcessorAdapter {
             type = DataStructureType.fromOrdinal(in.readByte());
             info = in.readObject();
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DataStructureInfo.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class ContainsAtomicProcessor implements
+        EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>, T2<Boolean, IgniteCheckedException>>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private DataStructureInfo info;
+
+        /**
+         * @param info Data structure information.
+         */
+        ContainsAtomicProcessor(DataStructureInfo info) {
+            assert info != null;
+
+            this.info = info;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public ContainsAtomicProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public T2<Boolean, IgniteCheckedException> process(
+            MutableEntry<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>> entry,
+            Object... args)
+            throws EntryProcessorException
+        {
+            Map<String, DataStructureInfo> map = entry.getValue();
+
+            if (map == null)
+                return new T2<>(false, null);
+
+            DataStructureInfo oldInfo = map.get(info.name);
+
+            if (oldInfo == null)
+                return new T2<>(false, null);
+
+            return new T2<>(true, oldInfo.validate(info, false));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            info.writeExternal(out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            info = new DataStructureInfo();
+
+            info.readExternal(in);
+        }
+    }
+
+    /**
+     *
+     */
+    static class ContainsCollectionProcessor implements
+        EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>, T3<Boolean, String, IgniteCheckedException>>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private DataStructureInfo info;
+
+        /**
+         * @param info Data structure information.
+         */
+        ContainsCollectionProcessor(DataStructureInfo info) {
+            assert info != null;
+
+            this.info = info;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public ContainsCollectionProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public T3<Boolean, String, IgniteCheckedException> process(
+            MutableEntry<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>> entry,
+            Object... args)
+            throws EntryProcessorException
+        {
+            Map<String, DataStructureInfo> map = entry.getValue();
+
+            if (map == null)
+                return new T3<>(false, null, null);
+
+            DataStructureInfo oldInfo = map.get(info.name);
+
+            if (oldInfo == null)
+                return new T3<>(false, null, null);
+
+            assert oldInfo.info instanceof CollectionInfo : oldInfo.info;
+
+            return new T3<>(true, ((CollectionInfo)oldInfo.info).cacheName, 
oldInfo.validate(info, false));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            info.writeExternal(out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            info = new DataStructureInfo();
+
+            info.readExternal(in);
+        }
     }
+
     /**
      *
      */
     static class AddAtomicProcessor implements
-        EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>, IgniteException>,
+        EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>, IgniteCheckedException>,
         Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -1273,6 +1599,8 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
          * @param info Data structure information.
          */
         AddAtomicProcessor(DataStructureInfo info) {
+            assert info != null;
+
             this.info = info;
         }
 
@@ -1284,7 +1612,7 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public IgniteException process(
+        @Override public IgniteCheckedException process(
             MutableEntry<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>> entry,
             Object... args)
             throws EntryProcessorException
@@ -1313,7 +1641,154 @@ public final class CacheDataStructuresProcessor extends 
GridProcessorAdapter {
                 return null;
             }
 
-            return null;
+            return oldInfo.validate(info, true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            info.writeExternal(out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            info = new DataStructureInfo();
+
+            info.readExternal(in);
+        }
+    }
+
+    /**
+     *
+     */
+    static class AddCollectionProcessor implements
+        EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>, T2<String, IgniteCheckedException>>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private DataStructureInfo info;
+
+        /**
+         * @param info Data structure information.
+         */
+        AddCollectionProcessor(DataStructureInfo info) {
+            assert info != null;
+            assert info.info instanceof CollectionInfo;
+
+            this.info = info;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public AddCollectionProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public T2<String, IgniteCheckedException> process(
+            MutableEntry<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>> entry,
+            Object... args)
+            throws EntryProcessorException
+        {
+            Map<String, DataStructureInfo> map = entry.getValue();
+
+            CollectionInfo colInfo = (CollectionInfo)info.info;
+
+            if (map == null) {
+                map = new HashMap<>();
+
+                map.put(info.name, info);
+
+                entry.setValue(map);
+
+                return new T2<>(colInfo.cacheName, null);
+            }
+
+            DataStructureInfo oldInfo = map.get(info.name);
+
+            if (oldInfo == null) {
+                map = new HashMap<>(map);
+
+                map.put(info.name, info);
+
+                entry.setValue(map);
+
+                return new T2<>(colInfo.cacheName, null);
+            }
+
+            return new T2<>(colInfo.cacheName, oldInfo.validate(info, true));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            info.writeExternal(out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            info = new DataStructureInfo();
+
+            info.readExternal(in);
+        }
+    }
+
+    /**
+     *
+     */
+    static class RemoveDataStructureProcessor implements
+        EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>, T2<Boolean, IgniteCheckedException>>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private DataStructureInfo info;
+
+        /**
+         * @param info Data structure information.
+         */
+        RemoveDataStructureProcessor(DataStructureInfo info) {
+            assert info != null;
+
+            this.info = info;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public RemoveDataStructureProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public T2<Boolean, IgniteCheckedException> process(
+            MutableEntry<CacheDataStructuresConfigurationKey, Map<String, 
DataStructureInfo>> entry,
+            Object... args)
+            throws EntryProcessorException
+        {
+            Map<String, DataStructureInfo> map = entry.getValue();
+
+            if (map == null)
+                return new T2<>(false, null);
+
+            DataStructureInfo oldInfo = map.get(info.name);
+
+            if (oldInfo == null)
+                return new T2<>(false, null);
+
+            IgniteCheckedException err = oldInfo.validate(info, false);
+
+            if (err == null) {
+                map = new HashMap<>(map);
+
+                map.remove(info.name);
+
+                entry.setValue(map);
+            }
+
+            return new T2<>(true, err);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index f614793..4c9e330 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -348,7 +348,7 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
 
             checkRemoved(t.get1());
 
-            removeKeys(id, queueName, collocated, t.get1(), t.get2(), 
batchSize);
+            removeKeys(cache, id, queueName, collocated, t.get1(), t.get2(), 
batchSize);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -382,6 +382,7 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
     }
 
     /**
+     * @param cache Queue cache.
      * @param id Queue unique ID.
      * @param name Queue name.
      * @param collocated Collocation flag.
@@ -391,7 +392,8 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    private void removeKeys(
+    static void removeKeys(
+        GridCacheAdapter cache,
         IgniteUuid id,
         String name,
         boolean collocated,
@@ -527,19 +529,7 @@ public abstract class GridCacheQueueAdapter<T> extends 
AbstractCollection<T> imp
             return;
 
         try {
-            GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.remove(new 
GridCacheQueueHeaderKey(queueName), null);
-
-            rmvd = true;
-
-            if (hdr == null || hdr.empty())
-                return;
-
-            removeKeys(hdr.id(),
-                queueName,
-                hdr.collocated(),
-                hdr.head(),
-                hdr.tail(),
-                0);
+            cctx.kernalContext().dataStructures().removeQueue(queueName, cctx);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 06a0845..a52f8e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -335,7 +335,7 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
             if (rmvd)
                 return;
 
-            ctx.dataStructures().removeSet(name);
+            ctx.kernalContext().dataStructures().removeSet(name, ctx);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java
index e4e0811..1c3dc50 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java
@@ -266,7 +266,7 @@ public abstract class 
GridCacheQueueJoinedNodeSelfAbstractTest extends IgniteCol
                 else
                     fail("Unexpected error: " + e);
             }
-            catch (IgniteCheckedException e) {
+            catch (Exception e) {
                 error("Failed to get value from the queue", e);
             }
             finally {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
index b625989..fb40cfc 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.datastructures;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.testframework.*;
@@ -27,20 +28,40 @@ import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
 import static org.apache.ignite.cache.CacheMode.*;
 
 /**
  *
  */
-public class IgniteDataStructureUniqueNameTest extends 
IgniteAtomicsAbstractTest {
+public class IgniteDataStructureUniqueNameTest extends 
IgniteCollectionAbstractTest {
     /** {@inheritDoc} */
-    @Override protected CacheMode atomicsCacheMode() {
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode collectionCacheMode() {
         return PARTITIONED;
     }
 
     /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 3;
+    @Override protected CacheAtomicityMode collectionCacheAtomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        IgniteAtomicConfiguration atomicCfg = new IgniteAtomicConfiguration();
+
+        atomicCfg.setBackups(1);
+        atomicCfg.setCacheMode(PARTITIONED);
+
+        cfg.setAtomicConfiguration(atomicCfg);
+
+        return cfg;
     }
 
     /**
@@ -58,6 +79,141 @@ public class IgniteDataStructureUniqueNameTest extends 
IgniteAtomicsAbstractTest
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCreateRemove() throws Exception {
+        final String name = IgniteUuid.randomUuid().toString();
+
+        final Ignite ignite = ignite(0);
+
+        assertNull(ignite.atomicLong(name, 0, false));
+
+        IgniteAtomicReference<Integer> ref = ignite.atomicReference(name, 0, 
true);
+
+        assertNotNull(ref);
+
+        assertSame(ref, ignite.atomicReference(name, 0, true));
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite.atomicLong(name, 0, false);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite.atomicLong(name, 0, true);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        ref.close();
+
+        IgniteAtomicLong atomicLong = ignite.atomicLong(name, 0, true);
+
+        assertNotNull(atomicLong);
+
+        assertSame(atomicLong, ignite.atomicLong(name, 0, true));
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite.atomicReference(name, 0, false);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite.queue(name, config(false), 0, true);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite.queue(name, null, 0, false);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite.set(name, config(false), true);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite.set(name, null, false);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        atomicLong.close();
+
+        IgniteQueue<Integer> q = ignite.queue(name, config(false), 0, true);
+
+        assertNotNull(q);
+
+        assertSame(q, ignite.queue(name, config(false), 0, true));
+
+        assertSame(q, ignite.queue(name, null, 0, false));
+
+        q.close();
+
+        assertNull(ignite.set(name, null, false));
+
+        IgniteSet<Integer> set = ignite.set(name, config(false), true);
+
+        assertNotNull(set);
+
+        assertSame(set, ignite.set(name, config(false), true));
+
+        assertSame(set, ignite.set(name, null, false));
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite.atomicReference(name, 0, false);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite.queue(name, config(false), 0, true);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                ignite.queue(name, null, 0, false);
+
+                return null;
+            }
+        }, IgniteException.class, null);
+
+        set.close();
+
+        ref = ignite.atomicReference(name, 0, true);
+
+        assertNotNull(ref);
+
+        assertSame(ref, ignite.atomicReference(name, 0, true));
+    }
+
+    /**
      * @param singleGrid If {@code true} uses single grid.
      * @throws Exception If failed.
      */

Reply via email to