Repository: incubator-ignite Updated Branches: refs/heads/ignite-6 368dd6375 -> 9e8939828
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java index 9704fad..951e42d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java @@ -22,8 +22,12 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.transactions.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.typedef.*; @@ -86,7 +90,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { private final IgniteAtomicConfiguration atomicCfg; /** */ - private GridCache<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> utilityCache; + private GridCacheProjectionEx<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> utilityCache; /** * @param ctx Context. @@ -105,7 +109,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; - utilityCache = ctx.cache().utilityCache(); + utilityCache = (GridCacheProjectionEx)ctx.cache().utilityCache(); assert utilityCache != null; @@ -153,119 +157,119 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - dsCacheCtx.gate().enter(); + final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); - try { - final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. + IgniteAtomicSequence val = cast(dsMap.get(key), IgniteAtomicSequence.class); - // Check type of structure received by key from local cache. - IgniteAtomicSequence val = cast(dsMap.get(key), IgniteAtomicSequence.class); + if (val != null) + return val; - if (val != null) - return val; + return getAtomic(new Callable<IgniteAtomicSequence>() { + @Override public IgniteAtomicSequence call() throws Exception { + dsCacheCtx.gate().enter(); - return CU.outTx(new Callable<IgniteAtomicSequence>() { - @Override public IgniteAtomicSequence call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class); + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class); - // Check that sequence hasn't been created in other thread yet. - GridCacheAtomicSequenceEx seq = cast(dsMap.get(key), GridCacheAtomicSequenceEx.class); + // Check that sequence hasn't been created in other thread yet. + GridCacheAtomicSequenceEx seq = cast(dsMap.get(key), GridCacheAtomicSequenceEx.class); - if (seq != null) { - assert seqVal != null; + if (seq != null) { + assert seqVal != null; - return seq; - } + return seq; + } - if (seqVal == null && !create) - return null; + if (seqVal == null && !create) + return null; - // We should use offset because we already reserved left side of range. - long off = atomicCfg.getAtomicSequenceReserveSize() > 1 ? - atomicCfg.getAtomicSequenceReserveSize() - 1 : 1; + // We should use offset because we already reserved left side of range. + long off = atomicCfg.getAtomicSequenceReserveSize() > 1 ? + atomicCfg.getAtomicSequenceReserveSize() - 1 : 1; - long upBound; - long locCntr; + long upBound; + long locCntr; - if (seqVal == null) { - locCntr = initVal; + if (seqVal == null) { + locCntr = initVal; - upBound = locCntr + off; + upBound = locCntr + off; - // Global counter must be more than reserved region. - seqVal = new GridCacheAtomicSequenceValue(upBound + 1); - } - else { - locCntr = seqVal.get(); + // Global counter must be more than reserved region. + seqVal = new GridCacheAtomicSequenceValue(upBound + 1); + } + else { + locCntr = seqVal.get(); - upBound = locCntr + off; + upBound = locCntr + off; - // Global counter must be more than reserved region. - seqVal.set(upBound + 1); - } + // Global counter must be more than reserved region. + seqVal.set(upBound + 1); + } - // Update global counter. - dsView.putx(key, seqVal); + // Update global counter. + dsView.putx(key, seqVal); - // Only one thread can be in the transaction scope and create sequence. - seq = new GridCacheAtomicSequenceImpl(name, - key, - seqView, - dsCacheCtx, - atomicCfg.getAtomicSequenceReserveSize(), - locCntr, - upBound); + // Only one thread can be in the transaction scope and create sequence. + seq = new GridCacheAtomicSequenceImpl(name, + key, + seqView, + dsCacheCtx, + atomicCfg.getAtomicSequenceReserveSize(), + locCntr, + upBound); - dsMap.put(key, seq); + dsMap.put(key, seq); - tx.commit(); + tx.commit(); - return seq; - } - catch (Error | Exception e) { - dsMap.remove(key); + return seq; + } + catch (Error | Exception e) { + dsMap.remove(key); - U.error(log, "Failed to make atomic sequence: " + name, e); + U.error(log, "Failed to make atomic sequence: " + name, e); - throw e; - } + throw e; } - }, dsCacheCtx); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to get sequence by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + finally { + dsCacheCtx.gate().leave(); + } + } + }, new DataStructureInfo(name, DataStructureType.ATOMIC_SEQ, null), create); } /** * Removes sequence from cache. * * @param name Sequence name. - * @return Method returns {@code true} if sequence has been removed and {@code false} if it's not cached. * @throws IgniteCheckedException If removing failed. */ - public final boolean removeSequence(String name) throws IgniteCheckedException { + public final void removeSequence(final String name) throws IgniteCheckedException { assert name != null; checkAtomicsConfiguration(); - dsCacheCtx.gate().enter(); + removeDataStructure(new IgniteCallable<Void>() { + @Override public Void call() throws Exception { + dsCacheCtx.gate().enter(); - try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); + try { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); - return removeInternal(key, GridCacheAtomicSequenceValue.class); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove sequence by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + removeInternal(key, GridCacheAtomicSequenceValue.class); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to remove sequence by name: " + name, e); + } + finally { + dsCacheCtx.gate().leave(); + } + + return null; + } + }, name, DataStructureType.ATOMIC_SEQ, null); } /** @@ -284,64 +288,111 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - dsCacheCtx.gate().enter(); + final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); - try { - final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. + IgniteAtomicLong atomicLong = cast(dsMap.get(key), IgniteAtomicLong.class); - // Check type of structure received by key from local cache. - IgniteAtomicLong atomicLong = cast(dsMap.get(key), IgniteAtomicLong.class); + if (atomicLong != null) + return atomicLong; - if (atomicLong != null) - return atomicLong; + return getAtomic(new Callable<IgniteAtomicLong>() { + @Override public IgniteAtomicLong call() throws Exception { + dsCacheCtx.gate().enter(); - return CU.outTx(new Callable<IgniteAtomicLong>() { - @Override public IgniteAtomicLong call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class); + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class); - // Check that atomic long hasn't been created in other thread yet. - GridCacheAtomicLongEx a = cast(dsMap.get(key), GridCacheAtomicLongEx.class); + // Check that atomic long hasn't been created in other thread yet. + GridCacheAtomicLongEx a = cast(dsMap.get(key), GridCacheAtomicLongEx.class); - if (a != null) { - assert val != null; + if (a != null) { + assert val != null; - return a; - } + return a; + } - if (val == null && !create) - return null; + if (val == null && !create) + return null; - if (val == null) { - val = new GridCacheAtomicLongValue(initVal); + if (val == null) { + val = new GridCacheAtomicLongValue(initVal); - dsView.putx(key, val); - } + dsView.putx(key, val); + } - a = new GridCacheAtomicLongImpl(name, key, atomicLongView, dsCacheCtx); + a = new GridCacheAtomicLongImpl(name, key, atomicLongView, dsCacheCtx); - dsMap.put(key, a); + dsMap.put(key, a); - tx.commit(); + tx.commit(); - return a; - } - catch (Error | Exception e) { - dsMap.remove(key); + return a; + } + catch (Error | Exception e) { + dsMap.remove(key); - U.error(log, "Failed to make atomic long: " + name, e); + U.error(log, "Failed to make atomic long: " + name, e); - throw e; - } + throw e; } - }, dsCacheCtx); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to get atomic long by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); + finally { + dsCacheCtx.gate().leave(); + } + } + }, new DataStructureInfo(name, DataStructureType.ATOMIC_LONG, null), create); + } + + /** + * @param c Closure creating data structure instance. + * @param dsInfo Data structure info. + * @param create Create flag. + * @return Data structure instance. + * @throws IgniteCheckedException + */ + @Nullable private <T> T getAtomic(Callable<T> c, + DataStructureInfo dsInfo, + boolean create) + throws IgniteCheckedException + { + Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); + + if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name))) + return null; + + IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, create); + + if (err != null) + throw err; + + T dataStructure; + + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + if (create) { + err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); + + if (err != null) + throw err; + } + else { + T2<Boolean, IgniteCheckedException> res = + utilityCache.invoke(DATA_STRUCTURES_KEY, new ContainsAtomicProcessor(dsInfo)).get(); + + err = res.get2(); + + if (err != null) + throw err; + + if (!res.get1()) + return null; + } + + dataStructure = ctx.closure().callLocalSafe(c, false).get(); + + tx.commit(); } + + return dataStructure; } /** @@ -350,23 +401,77 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @param name Atomic long name. * @throws IgniteCheckedException If removing failed. */ - public final void removeAtomicLong(String name) throws IgniteCheckedException { + public final void removeAtomicLong(final String name) throws IgniteCheckedException { assert name != null; assert dsCacheCtx != null; - dsCacheCtx.gate().enter(); + removeDataStructure(new IgniteCallable<Void>() { + @Override public Void call() throws Exception { + dsCacheCtx.gate().enter(); - try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); + try { + removeInternal(new GridCacheInternalKeyImpl(name), GridCacheAtomicLongValue.class); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to remove atomic long by name: " + name, e); + } + finally { + dsCacheCtx.gate().leave(); + } - removeInternal(key, GridCacheAtomicLongValue.class); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove atomic long by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); + return null; + } + }, name, DataStructureType.ATOMIC_LONG, null); + } + + /** + * @param c Closure. + * @param name Data structure name. + * @param type Data structure type. + * @throws IgniteCheckedException If failed. + */ + private <T> void removeDataStructure(IgniteCallable<T> c, + String name, + DataStructureType type, + @Nullable IgniteInClosureX<T> afterRmv) + throws IgniteCheckedException + { + Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); + + if (dsMap == null || !dsMap.containsKey(name)) + return; + + DataStructureInfo dsInfo = new DataStructureInfo(name, type, null); + + IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, false); + + if (err != null) + throw err; + + T rmvInfo; + + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + T2<Boolean, IgniteCheckedException> res = + utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get(); + + err = res.get2(); + + if (err != null) + throw err; + + if (!res.get1()) { + tx.commit(); + + return; + } + + rmvInfo = ctx.closure().callLocalSafe(c, false).get(); + + tx.commit(); } + + if (afterRmv != null && rmvInfo != null) + afterRmv.applyx(rmvInfo); } /** @@ -389,66 +494,61 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - dsCacheCtx.gate().enter(); + final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); - try { - final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. + IgniteAtomicReference atomicRef = cast(dsMap.get(key), IgniteAtomicReference.class); - // Check type of structure received by key from local cache. - IgniteAtomicReference atomicRef = cast(dsMap.get(key), IgniteAtomicReference.class); + if (atomicRef != null) + return atomicRef; - if (atomicRef != null) - return atomicRef; + return getAtomic(new Callable<IgniteAtomicReference<T>>() { + @Override public IgniteAtomicReference<T> call() throws Exception { + dsCacheCtx.gate().enter(); - return CU.outTx(new Callable<IgniteAtomicReference<T>>() { - @Override public IgniteAtomicReference<T> call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicReferenceValue val = cast(dsView.get(key), - GridCacheAtomicReferenceValue.class); + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicReferenceValue val = cast(dsView.get(key), + GridCacheAtomicReferenceValue.class); - // Check that atomic reference hasn't been created in other thread yet. - GridCacheAtomicReferenceEx ref = cast(dsMap.get(key), - GridCacheAtomicReferenceEx.class); + // Check that atomic reference hasn't been created in other thread yet. + GridCacheAtomicReferenceEx ref = cast(dsMap.get(key), + GridCacheAtomicReferenceEx.class); - if (ref != null) { - assert val != null; + if (ref != null) { + assert val != null; - return ref; - } + return ref; + } - if (val == null && !create) - return null; + if (val == null && !create) + return null; - if (val == null) { - val = new GridCacheAtomicReferenceValue(initVal); + if (val == null) { + val = new GridCacheAtomicReferenceValue(initVal); - dsView.putx(key, val); - } + dsView.putx(key, val); + } - ref = new GridCacheAtomicReferenceImpl(name, key, atomicRefView, dsCacheCtx); + ref = new GridCacheAtomicReferenceImpl(name, key, atomicRefView, dsCacheCtx); - dsMap.put(key, ref); + dsMap.put(key, ref); - tx.commit(); + tx.commit(); - return ref; - } - catch (Error | Exception e) { - dsMap.remove(key); + return ref; + } + catch (Error | Exception e) { + dsMap.remove(key); - U.error(log, "Failed to make atomic reference: " + name, e); + U.error(log, "Failed to make atomic reference: " + name, e); - throw e; - } + throw e; } - }, dsCacheCtx); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to get atomic reference by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + finally { + dsCacheCtx.gate().leave(); + } + } + }, new DataStructureInfo(name, DataStructureType.ATOMIC_REF, null), create); } /** @@ -457,23 +557,29 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @param name Atomic reference name. * @throws IgniteCheckedException If removing failed. */ - public final void removeAtomicReference(String name) throws IgniteCheckedException { + public final void removeAtomicReference(final String name) throws IgniteCheckedException { assert name != null; assert dsCacheCtx != null; - dsCacheCtx.gate().enter(); + removeDataStructure(new IgniteCallable<Void>() { + @Override public Void call() throws Exception { + dsCacheCtx.gate().enter(); - try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); + try { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); - removeInternal(key, GridCacheAtomicReferenceValue.class); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove atomic reference by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + removeInternal(key, GridCacheAtomicReferenceValue.class); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to remove atomic reference by name: " + name, e); + } + finally { + dsCacheCtx.gate().leave(); + } + + return null; + } + }, name, DataStructureType.ATOMIC_REF, null); } /** @@ -495,66 +601,61 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - dsCacheCtx.gate().enter(); + final GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name); - try { - final GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. + IgniteAtomicStamped atomicStamped = cast(dsMap.get(key), IgniteAtomicStamped.class); - // Check type of structure received by key from local cache. - IgniteAtomicStamped atomicStamped = cast(dsMap.get(key), IgniteAtomicStamped.class); + if (atomicStamped != null) + return atomicStamped; - if (atomicStamped != null) - return atomicStamped; + return getAtomic(new Callable<IgniteAtomicStamped<T, S>>() { + @Override public IgniteAtomicStamped<T, S> call() throws Exception { + dsCacheCtx.gate().enter(); - return CU.outTx(new Callable<IgniteAtomicStamped<T, S>>() { - @Override public IgniteAtomicStamped<T, S> call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicStampedValue val = cast(dsView.get(key), - GridCacheAtomicStampedValue.class); + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicStampedValue val = cast(dsView.get(key), + GridCacheAtomicStampedValue.class); - // Check that atomic stamped hasn't been created in other thread yet. - GridCacheAtomicStampedEx stmp = cast(dsMap.get(key), - GridCacheAtomicStampedEx.class); + // Check that atomic stamped hasn't been created in other thread yet. + GridCacheAtomicStampedEx stmp = cast(dsMap.get(key), + GridCacheAtomicStampedEx.class); - if (stmp != null) { - assert val != null; + if (stmp != null) { + assert val != null; - return stmp; - } + return stmp; + } - if (val == null && !create) - return null; + if (val == null && !create) + return null; - if (val == null) { - val = new GridCacheAtomicStampedValue(initVal, initStamp); + if (val == null) { + val = new GridCacheAtomicStampedValue(initVal, initStamp); - dsView.putx(key, val); - } + dsView.putx(key, val); + } - stmp = new GridCacheAtomicStampedImpl(name, key, atomicStampedView, dsCacheCtx); + stmp = new GridCacheAtomicStampedImpl(name, key, atomicStampedView, dsCacheCtx); - dsMap.put(key, stmp); + dsMap.put(key, stmp); - tx.commit(); + tx.commit(); - return stmp; - } - catch (Error | Exception e) { - dsMap.remove(key); + return stmp; + } + catch (Error | Exception e) { + dsMap.remove(key); - U.error(log, "Failed to make atomic stamped: " + name, e); + U.error(log, "Failed to make atomic stamped: " + name, e); - throw e; - } + throw e; } - }, dsCacheCtx); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to get atomic stamped by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + finally { + dsCacheCtx.gate().leave(); + } + } + }, new DataStructureInfo(name, DataStructureType.ATOMIC_STAMPED, null), create); } /** @@ -563,23 +664,29 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @param name Atomic stamped name. * @throws IgniteCheckedException If removing failed. */ - public final void removeAtomicStamped(String name) throws IgniteCheckedException { + public final void removeAtomicStamped(final String name) throws IgniteCheckedException { assert name != null; assert dsCacheCtx != null; - dsCacheCtx.gate().enter(); + removeDataStructure(new IgniteCallable<Void>() { + @Override public Void call() throws Exception { + dsCacheCtx.gate().enter(); - try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); + try { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); - removeInternal(key, GridCacheAtomicStampedValue.class); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove atomic stamped by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + removeInternal(key, GridCacheAtomicStampedValue.class); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to remove atomic stamped by name: " + name, e); + } + finally { + dsCacheCtx.gate().leave(); + } + + return null; + } + }, name, DataStructureType.ATOMIC_STAMPED, null); } /** @@ -594,9 +701,9 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") public final <T> IgniteQueue<T> queue(final String name, - @Nullable IgniteCollectionConfiguration cfg, + @Nullable final IgniteCollectionConfiguration cfg, int cap, - boolean create) + final boolean create) throws IgniteCheckedException { A.notNull(name, "name"); @@ -610,48 +717,117 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { throw new IgniteCheckedException("Cache for collection is not configured: " + cfg.getCacheName()); } + DataStructureInfo dsInfo = new DataStructureInfo(name, + DataStructureType.QUEUE, + create ? new QueueInfo(cfg.getCacheName(), cfg.isCollocated(), cap) : null); + + final int cap0 = cap; + + return getCollection(new IgniteClosureX<GridCacheContext, IgniteQueue<T>>() { + @Override public IgniteQueue<T> applyx(GridCacheContext ctx) throws IgniteCheckedException { + return ctx.dataStructures().queue(name, cap0, create && cfg.isCollocated(), create); + } + }, dsInfo, create); + } + + /** + * @param name Queue name. + * @param cctx Queue cache context. + * @throws IgniteCheckedException + */ + public void removeQueue(final String name, final GridCacheContext cctx) throws IgniteCheckedException { + assert name != null; + assert cctx != null; + + IgniteCallable<GridCacheQueueHeader> rmv = new IgniteCallable<GridCacheQueueHeader>() { + @Override public GridCacheQueueHeader call() throws Exception { + return (GridCacheQueueHeader)retryRemove(cctx.cache(), new GridCacheQueueHeaderKey(name)); + } + }; + + CIX1<GridCacheQueueHeader> afterRmv = new CIX1<GridCacheQueueHeader>() { + @Override public void applyx(GridCacheQueueHeader hdr) throws IgniteCheckedException { + if (hdr.empty()) + return; + + GridCacheQueueAdapter.removeKeys(cctx.cache(), + hdr.id(), + name, + hdr.collocated(), + hdr.head(), + hdr.tail(), + 0); + } + }; + + removeDataStructure(rmv, name, DataStructureType.QUEUE, afterRmv); + } + + /** + * @param c Closure creating collection. + * @param dsInfo Data structure info. + * @param create Create flag. + * @return Collection instance. + * @throws IgniteCheckedException If failed. + */ + @Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c, + DataStructureInfo dsInfo, + boolean create) + throws IgniteCheckedException + { Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); - if (!create && (dsMap == null || !dsMap.containsKey(name))) + if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name))) return null; - DataStructureInfo dsInfo = new DataStructureInfo(name, - DataStructureType.QUEUE, - create ? new QueueInfo(cfg.isCollocated(), cap, cfg.getCacheName()) : null); - - IgniteCheckedException err = validateDataStructure(utilityCache.get(DATA_STRUCTURES_KEY), dsInfo, create); + IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, create); if (err != null) throw err; + T col; + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - dsMap = utilityCache.get(DATA_STRUCTURES_KEY); + final String cacheName; - validateDataStructure(dsMap, dsInfo, create); + if (create) { + T2<String, IgniteCheckedException> res = + utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get(); - String cacheName; + err = res.get2(); - if (create) { - if (dsMap == null) { - dsMap = new HashMap<>(); + if (err != null) + throw err; - dsMap.put(name, dsInfo); - } + cacheName = res.get1(); + } + else { + T3<Boolean, String, IgniteCheckedException> res = + utilityCache.invoke(DATA_STRUCTURES_KEY, new ContainsCollectionProcessor(dsInfo)).get(); + + err = res.get3(); + + if (err != null) + throw err; - DataStructureInfo info = dsMap.get(name); + if (!res.get1()) + return null; - if (info == null) - dsMap.put(name, info); + cacheName = res.get2(); } - else if (dsMap == null || !dsMap.containsKey(name)) - return null; + + final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context(); + + col = ctx.closure().callLocalSafe(new Callable<T>() { + @Override public T call() throws Exception { + return c.applyx(cacheCtx); + } + }, false).get(); tx.commit(); } - GridCacheAdapter cache = cacheForCollection(cfg); - - return cache.context().dataStructures().queue(name, cap, create && cfg.isCollocated(), create); + return col; } /** @@ -670,7 +846,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { DataStructureInfo oldInfo = dsMap.get(info.name); if (oldInfo != null) - return oldInfo.validateConfiguration(info, create); + return oldInfo.validate(info, create); return null; } @@ -702,66 +878,61 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - dsCacheCtx.gate().enter(); + final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); - try { - final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. + GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class); - // Check type of structure received by key from local cache. - IgniteCountDownLatch latch = cast(dsMap.get(key), IgniteCountDownLatch.class); + if (latch != null) + return latch; - if (latch != null) - return latch; + return getAtomic(new Callable<IgniteCountDownLatch>() { + @Override public IgniteCountDownLatch call() throws Exception { + dsCacheCtx.gate().enter(); - return CU.outTx(new Callable<IgniteCountDownLatch>() { - @Override public IgniteCountDownLatch call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue val = cast(dsView.get(key), - GridCacheCountDownLatchValue.class); + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheCountDownLatchValue val = cast(dsView.get(key), + GridCacheCountDownLatchValue.class); - // Check that count down hasn't been created in other thread yet. - GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class); + // Check that count down hasn't been created in other thread yet. + GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class); - if (latch != null) { - assert val != null; + if (latch != null) { + assert val != null; - return latch; - } + return latch; + } - if (val == null && !create) - return null; + if (val == null && !create) + return null; - if (val == null) { - val = new GridCacheCountDownLatchValue(cnt, autoDel); + if (val == null) { + val = new GridCacheCountDownLatchValue(cnt, autoDel); - dsView.putx(key, val); - } + dsView.putx(key, val); + } - latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(), - val.autoDelete(), key, cntDownLatchView, dsCacheCtx); + latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(), + val.autoDelete(), key, cntDownLatchView, dsCacheCtx); - dsMap.put(key, latch); + dsMap.put(key, latch); - tx.commit(); + tx.commit(); - return latch; - } - catch (Error | Exception e) { - dsMap.remove(key); + return latch; + } + catch (Error | Exception e) { + dsMap.remove(key); - U.error(log, "Failed to create count down latch: " + name, e); + U.error(log, "Failed to create count down latch: " + name, e); - throw e; - } + throw e; } - }, dsCacheCtx); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to get count down latch by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + finally { + dsCacheCtx.gate().leave(); + } + } + }, new DataStructureInfo(name, DataStructureType.COUNT_DOWN_LATCH, null), create); } /** @@ -774,47 +945,40 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert dsCacheCtx != null; - dsCacheCtx.gate().enter(); + removeDataStructure(new IgniteCallable<Void>() { + @Override + public Void call() throws Exception { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); - try { - CU.outTx(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); + dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - // Check correctness type of removable object. - GridCacheCountDownLatchValue val = + try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + // Check correctness type of removable object. + GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class); - if (val != null) { - if (val.get() > 0) { - throw new IgniteCheckedException("Failed to remove count down latch " + + if (val != null) { + if (val.get() > 0) { + throw new IgniteCheckedException("Failed to remove count down latch " + "with non-zero count: " + val.get()); - } + } - dsView.removex(key); + dsView.removex(key); - tx.commit(); - } - else - tx.setRollbackOnly(); + tx.commit(); + } else + tx.setRollbackOnly(); - return val != null; - } - catch (Error | Exception e) { - U.error(log, "Failed to remove data structure: " + key, e); + return null; + } catch (Error | Exception e) { + U.error(log, "Failed to remove data structure: " + key, e); - throw e; - } + throw e; + } finally { + dsCacheCtx.gate().leave(); } - }, dsCacheCtx); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove count down latch by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + } + }, name, DataStructureType.COUNT_DOWN_LATCH, null); } /** @@ -926,17 +1090,66 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") @Nullable public <T> IgniteSet<T> set(final String name, - @Nullable IgniteCollectionConfiguration cfg, + @Nullable final IgniteCollectionConfiguration cfg, final boolean create) throws IgniteCheckedException { A.notNull(name, "name"); - if (create) + if (create) { A.notNull(cfg, "cfg"); - GridCacheAdapter cache = cacheForCollection(cfg); + if (ctx.cache().publicCache(cfg.getCacheName()) == null) + throw new IgniteCheckedException("Cache for collection is not configured: " + cfg.getCacheName()); + } - return cache.context().dataStructures().set(name, create ? cfg.isCollocated() : false, create); + DataStructureInfo dsInfo = new DataStructureInfo(name, + DataStructureType.SET, + create ? new CollectionInfo(cfg.getCacheName(), cfg.isCollocated()) : null); + + return getCollection(new CX1<GridCacheContext, IgniteSet<T>>() { + @Override public IgniteSet<T> applyx(GridCacheContext cctx) throws IgniteCheckedException { + return cctx.dataStructures().set(name, create ? cfg.isCollocated() : false, create); + } + }, dsInfo, create); + } + + /** + * @param name Set name. + * @param cctx Set cache context. + * @throws IgniteCheckedException + */ + public void removeSet(final String name, final GridCacheContext cctx) throws IgniteCheckedException { + assert name != null; + assert cctx != null; + + IgniteCallable<GridCacheSetHeader> rmv = new IgniteCallable<GridCacheSetHeader>() { + @Override public GridCacheSetHeader call() throws Exception { + return (GridCacheSetHeader)retryRemove(cctx.cache(), new GridCacheSetHeaderKey(name)); + } + }; + + CIX1<GridCacheSetHeader> afterRmv = new CIX1<GridCacheSetHeader>() { + @Override public void applyx(GridCacheSetHeader hdr) throws IgniteCheckedException { + cctx.dataStructures().removeSetData(hdr.id()); + } + }; + + removeDataStructure(rmv, name, DataStructureType.SET, afterRmv); + } + + /** + * @param cache Cache. + * @param key Key to remove. + * @throws IgniteCheckedException If failed. + * @return Removed value. + */ + @SuppressWarnings("unchecked") + @Nullable private <T> T retryRemove(final GridCache cache, final Object key) throws IgniteCheckedException { + return retry(log, new Callable<T>() { + @Nullable @Override public T call() throws Exception { + return (T)cache.remove(key); + } + }); } /** @@ -953,10 +1166,10 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { try { return call.call(); } - catch (ClusterGroupEmptyException e) { - throw new IgniteException(e); + catch (ClusterGroupEmptyCheckedException e) { + throw new IgniteCheckedException(e); } - catch (IgniteTxRollbackException | CachePartialUpdateCheckedException | ClusterTopologyException e) { + catch (IgniteTxRollbackCheckedException | CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { @@ -967,11 +1180,11 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } } } - catch (IgniteCheckedException | IgniteException e) { + catch (IgniteCheckedException e) { throw e; } catch (Exception e) { - throw new IgniteException(e); + throw new IgniteCheckedException(e); } } @@ -1005,22 +1218,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } /** - * @param cfg Collection configuration. - * @return Cache to use for collection. - * @throws IgniteCheckedException If cache is not configured. - */ - private GridCacheAdapter cacheForCollection(IgniteCollectionConfiguration cfg) throws IgniteCheckedException { - if (ctx.cache().publicCache(cfg.getCacheName()) == null) - throw new IgniteCheckedException("Cache for collection is not configured: " + cfg.getCacheName()); - - GridCacheAdapter cache = ctx.cache().internalCache(cfg.getCacheName()); - - assert cache != null : cfg.getCacheName(); - - return cache; - } - - /** * @throws IgniteException If atomics configuration is not provided. */ private void checkAtomicsConfiguration() throws IgniteException { @@ -1046,6 +1243,9 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { ATOMIC_STAMPED(IgniteAtomicStamped.class.getSimpleName()), /** */ + COUNT_DOWN_LATCH(IgniteCountDownLatch.class.getSimpleName()), + + /** */ QUEUE(IgniteQueue.class.getSimpleName()), /** */ @@ -1083,7 +1283,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { /** * */ - static class SetInfo implements Externalizable { + static class CollectionInfo implements Externalizable { /** */ private boolean collocated; @@ -1093,41 +1293,43 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { /** * Required by {@link Externalizable}. */ - public SetInfo() { + public CollectionInfo() { // No-op. } /** * @param collocated Collocated flag. */ - public SetInfo(boolean collocated) { + public CollectionInfo(String cacheName, boolean collocated) { + this.cacheName = cacheName; this.collocated = collocated; } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { collocated = in.readBoolean(); + cacheName = U.readString(in); } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeBoolean(collocated); + U.writeString(out, cacheName); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CollectionInfo.class, this); } } /** * */ - static class QueueInfo implements Externalizable { - /** */ - private boolean collocated; - + static class QueueInfo extends CollectionInfo { /** */ private int cap; - /** */ - private String cacheName; - /** * Required by {@link Externalizable}. */ @@ -1140,24 +1342,29 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @param cap Queue capacity. * @param cacheName Cache name. */ - public QueueInfo(boolean collocated, int cap, String cacheName) { - this.collocated = collocated; + public QueueInfo(String cacheName, boolean collocated, int cap) { + super(cacheName, collocated); + this.cap = cap; - this.cacheName = cacheName; } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - collocated = in.readBoolean(); + super.readExternal(in); + cap = in.readInt(); - cacheName = U.readString(in); } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeBoolean(collocated); + super.writeExternal(out); + out.writeInt(cap); - U.writeString(out, cacheName); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueueInfo.class, this, "super", super.toString()); } } @@ -1200,42 +1407,33 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @param create Create flag. * @return Exception if validation failed. */ - @Nullable IgniteCheckedException validateConfiguration(DataStructureInfo dsInfo, boolean create) { + @Nullable IgniteCheckedException validate(DataStructureInfo dsInfo, boolean create) { if (type != dsInfo.type) { return new IgniteCheckedException("Another data structure with the same name already created " + - "[name= " + name + - ", new= " + dsInfo.type.className() + - ", existing=" + type.className() + ']'); + "[name=" + name + + ", newType=" + dsInfo.type.className() + + ", existingType=" + type.className() + ']'); } if (create) { - if (type == DataStructureType.QUEUE ) { - QueueInfo oldInfo = (QueueInfo)info; - QueueInfo newInfo = (QueueInfo)dsInfo.info; + if (type == DataStructureType.QUEUE || type == DataStructureType.SET) { + CollectionInfo oldInfo = (CollectionInfo)info; + CollectionInfo newInfo = (CollectionInfo)dsInfo.info; if (oldInfo.collocated != newInfo.collocated) { - return new IgniteCheckedException("Another queue with the same name but different " + - "configuration already created [name= " + name + - ", newCollocated= " + newInfo.collocated + + return new IgniteCheckedException("Another collection with the same name but different " + + "configuration already created [name=" + name + + ", newCollocated=" + newInfo.collocated + ", existingCollocated=" + newInfo.collocated + ']'); } - if (oldInfo.cap != newInfo.cap) { - return new IgniteCheckedException("Another queue with the same name but different " + - "configuration already created [name= " + name + - ", newCapacity= " + newInfo.cap+ - ", existingCapacity=" + newInfo.cap + ']'); - } - } - else if (type == DataStructureType.SET ) { - SetInfo oldInfo = (SetInfo)info; - SetInfo newInfo = (SetInfo)dsInfo.info; - - if (oldInfo.collocated != newInfo.collocated) { - return new IgniteCheckedException("Another set with the same name but different " + - "configuration already created [name= " + name + - ", newCollocated= " + newInfo.collocated + - ", existingCollocated=" + newInfo.collocated + ']'); + if (type == DataStructureType.QUEUE) { + if (((QueueInfo)oldInfo).cap != ((QueueInfo)newInfo).cap) { + return new IgniteCheckedException("Another queue with the same name but different " + + "configuration already created [name=" + name + + ", newCapacity=" + ((QueueInfo)newInfo).cap + + ", existingCapacity=" + ((QueueInfo)oldInfo).cap + ']'); + } } } } @@ -1256,12 +1454,140 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { type = DataStructureType.fromOrdinal(in.readByte()); info = in.readObject(); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStructureInfo.class, this); + } + } + + /** + * + */ + static class ContainsAtomicProcessor implements + EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>, T2<Boolean, IgniteCheckedException>>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private DataStructureInfo info; + + /** + * @param info Data structure information. + */ + ContainsAtomicProcessor(DataStructureInfo info) { + assert info != null; + + this.info = info; + } + + /** + * Required by {@link Externalizable}. + */ + public ContainsAtomicProcessor() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public T2<Boolean, IgniteCheckedException> process( + MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry, + Object... args) + throws EntryProcessorException + { + Map<String, DataStructureInfo> map = entry.getValue(); + + if (map == null) + return new T2<>(false, null); + + DataStructureInfo oldInfo = map.get(info.name); + + if (oldInfo == null) + return new T2<>(false, null); + + return new T2<>(true, oldInfo.validate(info, false)); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + info.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + info = new DataStructureInfo(); + + info.readExternal(in); + } + } + + /** + * + */ + static class ContainsCollectionProcessor implements + EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>, T3<Boolean, String, IgniteCheckedException>>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private DataStructureInfo info; + + /** + * @param info Data structure information. + */ + ContainsCollectionProcessor(DataStructureInfo info) { + assert info != null; + + this.info = info; + } + + /** + * Required by {@link Externalizable}. + */ + public ContainsCollectionProcessor() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public T3<Boolean, String, IgniteCheckedException> process( + MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry, + Object... args) + throws EntryProcessorException + { + Map<String, DataStructureInfo> map = entry.getValue(); + + if (map == null) + return new T3<>(false, null, null); + + DataStructureInfo oldInfo = map.get(info.name); + + if (oldInfo == null) + return new T3<>(false, null, null); + + assert oldInfo.info instanceof CollectionInfo : oldInfo.info; + + return new T3<>(true, ((CollectionInfo)oldInfo.info).cacheName, oldInfo.validate(info, false)); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + info.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + info = new DataStructureInfo(); + + info.readExternal(in); + } } + /** * */ static class AddAtomicProcessor implements - EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>, IgniteException>, + EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>, IgniteCheckedException>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -1273,6 +1599,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @param info Data structure information. */ AddAtomicProcessor(DataStructureInfo info) { + assert info != null; + this.info = info; } @@ -1284,7 +1612,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public IgniteException process( + @Override public IgniteCheckedException process( MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry, Object... args) throws EntryProcessorException @@ -1313,7 +1641,154 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return null; } - return null; + return oldInfo.validate(info, true); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + info.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + info = new DataStructureInfo(); + + info.readExternal(in); + } + } + + /** + * + */ + static class AddCollectionProcessor implements + EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>, T2<String, IgniteCheckedException>>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private DataStructureInfo info; + + /** + * @param info Data structure information. + */ + AddCollectionProcessor(DataStructureInfo info) { + assert info != null; + assert info.info instanceof CollectionInfo; + + this.info = info; + } + + /** + * Required by {@link Externalizable}. + */ + public AddCollectionProcessor() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public T2<String, IgniteCheckedException> process( + MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry, + Object... args) + throws EntryProcessorException + { + Map<String, DataStructureInfo> map = entry.getValue(); + + CollectionInfo colInfo = (CollectionInfo)info.info; + + if (map == null) { + map = new HashMap<>(); + + map.put(info.name, info); + + entry.setValue(map); + + return new T2<>(colInfo.cacheName, null); + } + + DataStructureInfo oldInfo = map.get(info.name); + + if (oldInfo == null) { + map = new HashMap<>(map); + + map.put(info.name, info); + + entry.setValue(map); + + return new T2<>(colInfo.cacheName, null); + } + + return new T2<>(colInfo.cacheName, oldInfo.validate(info, true)); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + info.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + info = new DataStructureInfo(); + + info.readExternal(in); + } + } + + /** + * + */ + static class RemoveDataStructureProcessor implements + EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>, T2<Boolean, IgniteCheckedException>>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private DataStructureInfo info; + + /** + * @param info Data structure information. + */ + RemoveDataStructureProcessor(DataStructureInfo info) { + assert info != null; + + this.info = info; + } + + /** + * Required by {@link Externalizable}. + */ + public RemoveDataStructureProcessor() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public T2<Boolean, IgniteCheckedException> process( + MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry, + Object... args) + throws EntryProcessorException + { + Map<String, DataStructureInfo> map = entry.getValue(); + + if (map == null) + return new T2<>(false, null); + + DataStructureInfo oldInfo = map.get(info.name); + + if (oldInfo == null) + return new T2<>(false, null); + + IgniteCheckedException err = oldInfo.validate(info, false); + + if (err == null) { + map = new HashMap<>(map); + + map.remove(info.name); + + entry.setValue(map); + } + + return new T2<>(true, err); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index f614793..4c9e330 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -348,7 +348,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp checkRemoved(t.get1()); - removeKeys(id, queueName, collocated, t.get1(), t.get2(), batchSize); + removeKeys(cache, id, queueName, collocated, t.get1(), t.get2(), batchSize); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -382,6 +382,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp } /** + * @param cache Queue cache. * @param id Queue unique ID. * @param name Queue name. * @param collocated Collocation flag. @@ -391,7 +392,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - private void removeKeys( + static void removeKeys( + GridCacheAdapter cache, IgniteUuid id, String name, boolean collocated, @@ -527,19 +529,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp return; try { - GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.remove(new GridCacheQueueHeaderKey(queueName), null); - - rmvd = true; - - if (hdr == null || hdr.empty()) - return; - - removeKeys(hdr.id(), - queueName, - hdr.collocated(), - hdr.head(), - hdr.tail(), - 0); + cctx.kernalContext().dataStructures().removeQueue(queueName, cctx); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 06a0845..a52f8e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -335,7 +335,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite if (rmvd) return; - ctx.dataStructures().removeSet(name); + ctx.kernalContext().dataStructures().removeSet(name, ctx); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java index e4e0811..1c3dc50 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueJoinedNodeSelfAbstractTest.java @@ -266,7 +266,7 @@ public abstract class GridCacheQueueJoinedNodeSelfAbstractTest extends IgniteCol else fail("Unexpected error: " + e); } - catch (IgniteCheckedException e) { + catch (Exception e) { error("Failed to get value from the queue", e); } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java index b625989..fb40cfc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; @@ -27,20 +28,40 @@ import java.io.*; import java.util.*; import java.util.concurrent.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMode.*; /** * */ -public class IgniteDataStructureUniqueNameTest extends IgniteAtomicsAbstractTest { +public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractTest { /** {@inheritDoc} */ - @Override protected CacheMode atomicsCacheMode() { + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected CacheMode collectionCacheMode() { return PARTITIONED; } /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; + @Override protected CacheAtomicityMode collectionCacheAtomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + IgniteAtomicConfiguration atomicCfg = new IgniteAtomicConfiguration(); + + atomicCfg.setBackups(1); + atomicCfg.setCacheMode(PARTITIONED); + + cfg.setAtomicConfiguration(atomicCfg); + + return cfg; } /** @@ -58,6 +79,141 @@ public class IgniteDataStructureUniqueNameTest extends IgniteAtomicsAbstractTest } /** + * @throws Exception If failed. + */ + public void testCreateRemove() throws Exception { + final String name = IgniteUuid.randomUuid().toString(); + + final Ignite ignite = ignite(0); + + assertNull(ignite.atomicLong(name, 0, false)); + + IgniteAtomicReference<Integer> ref = ignite.atomicReference(name, 0, true); + + assertNotNull(ref); + + assertSame(ref, ignite.atomicReference(name, 0, true)); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite.atomicLong(name, 0, false); + + return null; + } + }, IgniteException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite.atomicLong(name, 0, true); + + return null; + } + }, IgniteException.class, null); + + ref.close(); + + IgniteAtomicLong atomicLong = ignite.atomicLong(name, 0, true); + + assertNotNull(atomicLong); + + assertSame(atomicLong, ignite.atomicLong(name, 0, true)); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite.atomicReference(name, 0, false); + + return null; + } + }, IgniteException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite.queue(name, config(false), 0, true); + + return null; + } + }, IgniteException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite.queue(name, null, 0, false); + + return null; + } + }, IgniteException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite.set(name, config(false), true); + + return null; + } + }, IgniteException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite.set(name, null, false); + + return null; + } + }, IgniteException.class, null); + + atomicLong.close(); + + IgniteQueue<Integer> q = ignite.queue(name, config(false), 0, true); + + assertNotNull(q); + + assertSame(q, ignite.queue(name, config(false), 0, true)); + + assertSame(q, ignite.queue(name, null, 0, false)); + + q.close(); + + assertNull(ignite.set(name, null, false)); + + IgniteSet<Integer> set = ignite.set(name, config(false), true); + + assertNotNull(set); + + assertSame(set, ignite.set(name, config(false), true)); + + assertSame(set, ignite.set(name, null, false)); + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite.atomicReference(name, 0, false); + + return null; + } + }, IgniteException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite.queue(name, config(false), 0, true); + + return null; + } + }, IgniteException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override + public Void call() throws Exception { + ignite.queue(name, null, 0, false); + + return null; + } + }, IgniteException.class, null); + + set.close(); + + ref = ignite.atomicReference(name, 0, true); + + assertNotNull(ref); + + assertSame(ref, ignite.atomicReference(name, 0, true)); + } + + /** * @param singleGrid If {@code true} uses single grid. * @throws Exception If failed. */