http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java index aa96213..cf87a1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; -import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.typedef.*; @@ -36,12 +35,12 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.internal.processors.cache.CacheFlag.*; import static org.apache.ignite.transactions.IgniteTxConcurrency.*; import static org.apache.ignite.transactions.IgniteTxIsolation.*; @@ -52,6 +51,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; * Manager of data structures. */ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { + /** */ + public static final CacheDataStructuresConfigurationKey DATA_STRUCTURES_KEY = + new CacheDataStructuresConfigurationKey(); + /** Initial capacity. */ private static final int INITIAL_CAPACITY = 10; @@ -70,12 +73,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { /** Queues map. */ private final ConcurrentMap<IgniteUuid, GridCacheQueueProxy> queuesMap; - /** Query notifying about queue update. */ - private GridCacheContinuousQueryAdapter queueQry; - - /** Queue query creation guard. */ - private final AtomicBoolean queueQryGuard = new AtomicBoolean(); - /** Cache contains only {@code GridCacheAtomicValue}. */ private CacheProjection<GridCacheInternalKey, GridCacheAtomicLongValue> atomicLongView; @@ -91,18 +88,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { /** Cache contains only entry {@code GridCacheSequenceValue}. */ private CacheProjection<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView; - /** Cache contains only entry {@code GridCacheQueueHeader}. */ - private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView; - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** Init latch. */ - private final CountDownLatch initLatch = new CountDownLatch(1); - - /** Init flag. */ - private boolean initFlag; - /** Set keys used for set iteration. */ private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<GridCacheSetItemKey>> setDataMap = new ConcurrentHashMap8<>(); @@ -111,10 +96,13 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { private final ConcurrentMap<IgniteUuid, GridCacheSetProxy> setsMap; /** */ - private GridCacheContext atomicsCtx; + private GridCacheContext atomicsCacheCtx; + + /** */ + private final IgniteAtomicConfiguration atomicCfg; /** */ - private final IgniteAtomicConfiguration cfg; + private IgniteCache<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> utilityCache; /** * @param ctx Context. @@ -126,15 +114,20 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { queuesMap = new ConcurrentHashMap8<>(INITIAL_CAPACITY); setsMap = new ConcurrentHashMap8<>(INITIAL_CAPACITY); - cfg = ctx.config().getAtomicConfiguration(); + atomicCfg = ctx.config().getAtomicConfiguration(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void start() throws IgniteCheckedException { - super.start(); + if (ctx.config().isDaemon()) + return; - if (cfg != null) { + utilityCache = ctx.cache().jcache(CU.UTILITY_CACHE_NAME); + + assert utilityCache != null; + + if (atomicCfg != null) { GridCache atomicsCache = ctx.cache().atomicsCache(); assert atomicsCache != null; @@ -156,71 +149,14 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { seqView = atomicsCache.projection (GridCacheInternalKey.class, GridCacheAtomicSequenceValue.class).flagsOn(CLONE); - atomicsCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context(); + atomicsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context(); } } - /* - TODO IGNITE-6 - @SuppressWarnings("unchecked") - @Override protected void onKernalStart0() { - try { - dsView = cctx.cache().projection - (GridCacheInternal.class, GridCacheInternal.class).flagsOn(CLONE); - - if (transactionalWithNear()) { - cntDownLatchView = cctx.cache().projection - (GridCacheInternalKey.class, GridCacheCountDownLatchValue.class).flagsOn(CLONE); - - atomicLongView = cctx.cache().projection - (GridCacheInternalKey.class, GridCacheAtomicLongValue.class).flagsOn(CLONE); - - atomicRefView = cctx.cache().projection - (GridCacheInternalKey.class, GridCacheAtomicReferenceValue.class).flagsOn(CLONE); - - atomicStampedView = cctx.cache().projection - (GridCacheInternalKey.class, GridCacheAtomicStampedValue.class).flagsOn(CLONE); - - seqView = cctx.cache().projection - (GridCacheInternalKey.class, GridCacheAtomicSequenceValue.class).flagsOn(CLONE); - } - - if (supportsQueue()) - queueHdrView = cctx.cache().projection - (GridCacheQueueHeaderKey.class, GridCacheQueueHeader.class).flagsOn(CLONE); - - initFlag = true; - } - finally { - initLatch.countDown(); - } - } - */ - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - initFlag = true; - - initLatch.countDown(); - } - /** {@inheritDoc} */ @Override public void onKernalStop(boolean cancel) { super.onKernalStop(cancel); - busyLock.block(); - - if (queueQry != null) { - try { - queueQry.close(); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to cancel queue header query.", e); - } - } - for (GridCacheQueueProxy q : queuesMap.values()) q.delegate().onKernalStop(); } @@ -234,15 +170,20 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @return Sequence. * @throws IgniteCheckedException If loading failed. */ - public final IgniteAtomicSequence sequence(final String name, final long initVal, - final boolean create) throws IgniteCheckedException { - waitInitialization(); + public final IgniteAtomicSequence sequence(final String name, + final long initVal, + final boolean create) + throws IgniteCheckedException + { + A.notNull(name, "name"); checkAtomicsConfiguration(); - final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + atomicsCacheCtx.gate().enter(); try { + final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. IgniteAtomicSequence val = cast(dsMap.get(key), IgniteAtomicSequence.class); @@ -251,9 +192,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return CU.outTx(new Callable<IgniteAtomicSequence>() { @Override public IgniteAtomicSequence call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(atomicsCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), - GridCacheAtomicSequenceValue.class); + try (IgniteTx tx = CU.txStartInternal(atomicsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class); // Check that sequence hasn't been created in other thread yet. GridCacheAtomicSequenceEx seq = cast(dsMap.get(key), GridCacheAtomicSequenceEx.class); @@ -268,7 +208,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return null; // We should use offset because we already reserved left side of range. - long off = cfg.getAtomicSequenceReserveSize() > 1 ? cfg.getAtomicSequenceReserveSize() - 1 : 1; + long off = atomicCfg.getAtomicSequenceReserveSize() > 1 ? + atomicCfg.getAtomicSequenceReserveSize() - 1 : 1; long upBound; long locCntr; @@ -297,8 +238,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { seq = new GridCacheAtomicSequenceImpl(name, key, seqView, - atomicsCtx, - cfg.getAtomicSequenceReserveSize(), + atomicsCacheCtx, + atomicCfg.getAtomicSequenceReserveSize(), locCntr, upBound); @@ -316,11 +257,14 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { throw e; } } - }, atomicsCtx); + }, atomicsCacheCtx); } catch (Exception e) { throw new IgniteCheckedException("Failed to get sequence by name: " + name, e); } + finally { + atomicsCacheCtx.gate().leave(); + } } /** @@ -331,7 +275,11 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If removing failed. */ public final boolean removeSequence(String name) throws IgniteCheckedException { - waitInitialization(); + assert name != null; + + checkAtomicsConfiguration(); + + atomicsCacheCtx.gate().enter(); try { GridCacheInternal key = new GridCacheInternalKeyImpl(name); @@ -341,6 +289,9 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { catch (Exception e) { throw new IgniteCheckedException("Failed to remove sequence by name: " + name, e); } + finally { + atomicsCacheCtx.gate().leave(); + } } /** @@ -355,13 +306,15 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { */ public final IgniteAtomicLong atomicLong(final String name, final long initVal, final boolean create) throws IgniteCheckedException { - waitInitialization(); + A.notNull(name, "name"); checkAtomicsConfiguration(); - final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + atomicsCacheCtx.gate().enter(); try { + final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. IgniteAtomicLong atomicLong = cast(dsMap.get(key), IgniteAtomicLong.class); @@ -369,9 +322,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return atomicLong; return CU.outTx(new Callable<IgniteAtomicLong>() { - @Override - public IgniteAtomicLong call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(atomicsCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + @Override public IgniteAtomicLong call() throws Exception { + try (IgniteTx tx = CU.txStartInternal(atomicsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class); @@ -393,7 +345,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { dsView.putx(key, val); } - a = new GridCacheAtomicLongImpl(name, key, atomicLongView, atomicsCtx); + a = new GridCacheAtomicLongImpl(name, key, atomicLongView, atomicsCacheCtx); dsMap.put(key, a); @@ -409,11 +361,14 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { throw e; } } - }, atomicsCtx); + }, atomicsCacheCtx); } catch (Exception e) { throw new IgniteCheckedException("Failed to get atomic long by name: " + name, e); } + finally { + atomicsCacheCtx.gate().leave(); + } } /** @@ -424,7 +379,11 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If removing failed. */ public final boolean removeAtomicLong(String name) throws IgniteCheckedException { - waitInitialization(); + assert name != null; + + checkAtomicsConfiguration(); + + atomicsCacheCtx.gate().enter(); try { GridCacheInternal key = new GridCacheInternalKeyImpl(name); @@ -434,6 +393,9 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { catch (Exception e) { throw new IgniteCheckedException("Failed to remove atomic long by name: " + name, e); } + finally { + atomicsCacheCtx.gate().leave(); + } } /** @@ -447,15 +409,20 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If loading failed. */ @SuppressWarnings("unchecked") - public final <T> IgniteAtomicReference<T> atomicReference(final String name, final T initVal, - final boolean create) throws IgniteCheckedException { - waitInitialization(); + public final <T> IgniteAtomicReference<T> atomicReference(final String name, + final T initVal, + final boolean create) + throws IgniteCheckedException + { + A.notNull(name, "name"); checkAtomicsConfiguration(); - final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + atomicsCacheCtx.gate().enter(); try { + final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. IgniteAtomicReference atomicRef = cast(dsMap.get(key), IgniteAtomicReference.class); @@ -463,9 +430,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return atomicRef; return CU.outTx(new Callable<IgniteAtomicReference<T>>() { - @Override - public IgniteAtomicReference<T> call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(atomicsCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + @Override public IgniteAtomicReference<T> call() throws Exception { + try (IgniteTx tx = CU.txStartInternal(atomicsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue val = cast(dsView.get(key), GridCacheAtomicReferenceValue.class); @@ -488,7 +454,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { dsView.putx(key, val); } - ref = new GridCacheAtomicReferenceImpl(name, key, atomicRefView, atomicsCtx); + ref = new GridCacheAtomicReferenceImpl(name, key, atomicRefView, atomicsCacheCtx); dsMap.put(key, ref); @@ -504,11 +470,15 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { throw e; } } - }, atomicsCtx); + }, atomicsCacheCtx); } catch (Exception e) { throw new IgniteCheckedException("Failed to get atomic reference by name: " + name, e); } + finally { + atomicsCacheCtx.gate().leave(); + + } } /** @@ -519,7 +489,11 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If removing failed. */ public final boolean removeAtomicReference(String name) throws IgniteCheckedException { - waitInitialization(); + assert name != null; + + checkAtomicsConfiguration(); + + atomicsCacheCtx.gate().enter(); try { GridCacheInternal key = new GridCacheInternalKeyImpl(name); @@ -529,6 +503,10 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { catch (Exception e) { throw new IgniteCheckedException("Failed to remove atomic reference by name: " + name, e); } + finally { + atomicsCacheCtx.gate().enter(); + + } } /** @@ -546,13 +524,15 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { @SuppressWarnings("unchecked") public final <T, S> IgniteAtomicStamped<T, S> atomicStamped(final String name, final T initVal, final S initStamp, final boolean create) throws IgniteCheckedException { - waitInitialization(); + A.notNull(name, "name"); checkAtomicsConfiguration(); - final GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name); + atomicsCacheCtx.gate().enter(); try { + final GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. IgniteAtomicStamped atomicStamped = cast(dsMap.get(key), IgniteAtomicStamped.class); @@ -560,9 +540,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return atomicStamped; return CU.outTx(new Callable<IgniteAtomicStamped<T, S>>() { - @Override - public IgniteAtomicStamped<T, S> call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(atomicsCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + @Override public IgniteAtomicStamped<T, S> call() throws Exception { + try (IgniteTx tx = CU.txStartInternal(atomicsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicStampedValue val = cast(dsView.get(key), GridCacheAtomicStampedValue.class); @@ -585,7 +564,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { dsView.putx(key, val); } - stmp = new GridCacheAtomicStampedImpl(name, key, atomicStampedView, atomicsCtx); + stmp = new GridCacheAtomicStampedImpl(name, key, atomicStampedView, atomicsCacheCtx); dsMap.put(key, stmp); @@ -601,11 +580,15 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { throw e; } } - }, atomicsCtx); + }, atomicsCacheCtx); } catch (Exception e) { throw new IgniteCheckedException("Failed to get atomic stamped by name: " + name, e); } + finally { + atomicsCacheCtx.gate().leave(); + + } } /** @@ -616,7 +599,11 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If removing failed. */ public final boolean removeAtomicStamped(String name) throws IgniteCheckedException { - waitInitialization(); + assert name != null; + + checkAtomicsConfiguration(); + + atomicsCacheCtx.gate().enter(); try { GridCacheInternal key = new GridCacheInternalKeyImpl(name); @@ -626,137 +613,56 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { catch (Exception e) { throw new IgniteCheckedException("Failed to remove atomic stamped by name: " + name, e); } + finally { + atomicsCacheCtx.gate().leave(); + } } /** * Gets a queue from cache or creates one if it's not cached. * * @param name Name of queue. + * @param cfg Queue configuration. * @param cap Max size of queue. - * @param colloc Collocation flag. * @param create If {@code true} queue will be created in case it is not in cache. * @return Instance of queue. * @throws IgniteCheckedException If failed. */ - public final <T> IgniteQueue<T> queue(final String name, final int cap, boolean colloc, - final boolean create) throws IgniteCheckedException { - waitInitialization(); - - // TODO IGNITE-6 - return null; - /* - checkSupportsQueue(); - - // Non collocated mode enabled only for PARTITIONED cache. - final boolean collocMode = cctx.cache().configuration().getCacheMode() != PARTITIONED || colloc; - - if (cctx.atomic()) - return queue0(name, cap, collocMode, create); - - return CU.outTx(new Callable<IgniteQueue<T>>() { - @Override public IgniteQueue<T> call() throws Exception { - return queue0(name, cap, collocMode, create); - } - }, cctx); - */ - } - - /** - * Gets or creates queue. - * - * @param name Queue name. - * @param cap Capacity. - * @param colloc Collocation flag. - * @param create If {@code true} queue will be created in case it is not in cache. - * @return Queue. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings({"unchecked", "NonPrivateFieldAccessedInSynchronizedContext"}) - private <T> IgniteQueue<T> queue0(GridCacheContext cctx, - final String name, - final int cap, - boolean colloc, - final boolean create) + @SuppressWarnings("unchecked") + public final <T> IgniteQueue<T> queue(final String name, + @Nullable IgniteCollectionConfiguration cfg, + int cap, + boolean create) throws IgniteCheckedException { - GridCacheQueueHeaderKey key = new GridCacheQueueHeaderKey(name); - - GridCacheQueueHeader header; + A.notNull(name, "name"); if (create) { - header = new GridCacheQueueHeader(IgniteUuid.randomUuid(), cap, colloc, 0, 0, null); - - GridCacheQueueHeader old = queueHdrView.putIfAbsent(key, header); - - if (old != null) { - if (old.capacity() != cap || old.collocated() != colloc) - throw new IgniteCheckedException("Failed to create queue, queue with the same name but different " + - "configuration already exists [name=" + name + ']'); + A.notNull(cfg, "cfg"); - header = old; - } + if (cap <= 0) + cap = Integer.MAX_VALUE; } - else - header = queueHdrView.get(key); - - if (header == null) - return null; - - if (queueQryGuard.compareAndSet(false, true)) { - queueQry = (GridCacheContinuousQueryAdapter)cctx.cache().queries().createContinuousQuery(); - - queueQry.filter(new QueueHeaderPredicate()); - - queueQry.localCallback(new IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry>>() { - @Override public boolean apply(UUID id, Collection<GridCacheContinuousQueryEntry> entries) { - if (!busyLock.enterBusy()) - return false; - - try { - for (GridCacheContinuousQueryEntry e : entries) { - GridCacheQueueHeaderKey key = (GridCacheQueueHeaderKey)e.getKey(); - GridCacheQueueHeader hdr = (GridCacheQueueHeader)e.getValue(); - - for (final GridCacheQueueProxy queue : queuesMap.values()) { - if (queue.name().equals(key.queueName())) { - if (hdr == null) { - GridCacheQueueHeader rmvd = (GridCacheQueueHeader)e.getOldValue(); - assert rmvd != null; + GridCacheAdapter cache = cacheForCollection(cfg); - if (rmvd.id().equals(queue.delegate().id())) { - queue.delegate().onRemoved(false); + GridCacheContext cctx = cache.context(); - queuesMap.remove(queue.delegate().id()); - } - } - else - queue.delegate().onHeaderChanged(hdr); - } - } - } + // Non collocated mode enabled only for PARTITIONED cache. + final boolean colloc = + create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || cfg.isCollocated()); - return true; - } - finally { - busyLock.leaveBusy(); - } - } - }); + GridCacheQueueHeader hdr = cctx.dataStructures().queue(name, cap, colloc, create); - queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, - true, - false, - false, - true); - } + if (hdr == null) + return null; - GridCacheQueueProxy queue = queuesMap.get(header.id()); + GridCacheQueueProxy queue = queuesMap.get(hdr.id()); if (queue == null) { - queue = new GridCacheQueueProxy(cctx, cctx.atomic() ? new GridAtomicCacheQueueImpl<>(name, header, cctx) : - new GridTransactionalCacheQueueImpl<>(name, header, cctx)); + queue = new GridCacheQueueProxy(cctx, cctx.atomic() ? new GridAtomicCacheQueueImpl<>(name, hdr, cctx) : + new GridTransactionalCacheQueueImpl<>(name, hdr, cctx)); - GridCacheQueueProxy old = queuesMap.putIfAbsent(header.id(), queue); + GridCacheQueueProxy old = queuesMap.putIfAbsent(hdr.id(), queue); if (old != null) queue = old; @@ -766,63 +672,33 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } /** - * Removes queue from cache. - * - * @param name Queue name. - * @param batchSize Batch size. - * @return Method returns {@code true} if queue has been removed and {@code false} if it's not cached. - * @throws IgniteCheckedException If removing failed. + * @param key Queue header key. + * @param hdr Current queue header. + * @param oldHdr Previous queue header value. */ - public final boolean removeQueue(final String name, final int batchSize) throws IgniteCheckedException { - waitInitialization(); - - // TODO IGNITE-6 - return false; - /* - checkSupportsQueue(); - - if (cctx.atomic()) - return removeQueue0(name, batchSize); - - return CU.outTx(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - return removeQueue0(name, batchSize); + public void onQueueUpdated(GridCacheQueueHeaderKey key, + @Nullable GridCacheQueueHeader hdr, + @Nullable GridCacheQueueHeader oldHdr) { + for (final GridCacheQueueProxy queue : queuesMap.values()) { + if (queue.name().equals(key.queueName())) { + if (hdr == null) { + assert oldHdr != null; + + if (oldHdr.id().equals(queue.delegate().id())) { + queue.delegate().onRemoved(false); + + queuesMap.remove(queue.delegate().id()); + } + } + else + queue.delegate().onHeaderChanged(hdr); } - }, cctx); - */ - } - - /** - * @param cctx Cache context. - * @param name Queue name. - * @param batchSize Batch size. - * @return {@code True} if queue was removed. - * @throws IgniteCheckedException If failed. - */ - private boolean removeQueue0(GridCacheContext cctx, String name, final int batchSize) throws IgniteCheckedException { - GridCacheQueueHeader hdr = queueHdrView.remove(new GridCacheQueueHeaderKey(name)); - - if (hdr == null) - return false; - - if (hdr.empty()) - return true; - - GridCacheQueueAdapter.removeKeys(cctx.kernalContext().cache().jcache(cctx.cache().name()), - hdr.id(), - name, - hdr.collocated(), - hdr.head(), - hdr.tail(), - batchSize); - - return true; + } } /** * Gets or creates count down latch. If count down latch is not found in cache, * it is created using provided name and count parameter. - * <p> * * @param name Name of the latch. * @param cnt Initial count. @@ -834,17 +710,23 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * {@code create} is false. * @throws IgniteCheckedException If operation failed. */ - public IgniteCountDownLatch countDownLatch(final String name, final int cnt, final boolean autoDel, - final boolean create) throws IgniteCheckedException { - A.ensure(cnt >= 0, "count can not be negative"); + public IgniteCountDownLatch countDownLatch(final String name, + final int cnt, + final boolean autoDel, + final boolean create) + throws IgniteCheckedException + { + A.notNull(name, "name"); - waitInitialization(); + A.ensure(cnt >= 0, "count can not be negative"); checkAtomicsConfiguration(); - final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + atomicsCacheCtx.gate().enter(); try { + final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); + // Check type of structure received by key from local cache. IgniteCountDownLatch latch = cast(dsMap.get(key), IgniteCountDownLatch.class); @@ -852,51 +734,54 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return latch; return CU.outTx(new Callable<IgniteCountDownLatch>() { - @Override public IgniteCountDownLatch call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(atomicsCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue val = cast(dsView.get(key), - GridCacheCountDownLatchValue.class); + @Override public IgniteCountDownLatch call() throws Exception { + try (IgniteTx tx = CU.txStartInternal(atomicsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheCountDownLatchValue val = cast(dsView.get(key), + GridCacheCountDownLatchValue.class); - // Check that count down hasn't been created in other thread yet. - GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class); + // Check that count down hasn't been created in other thread yet. + GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class); - if (latch != null) { - assert val != null; + if (latch != null) { + assert val != null; - return latch; - } + return latch; + } - if (val == null && !create) - return null; + if (val == null && !create) + return null; - if (val == null) { - val = new GridCacheCountDownLatchValue(cnt, autoDel); + if (val == null) { + val = new GridCacheCountDownLatchValue(cnt, autoDel); - dsView.putx(key, val); - } + dsView.putx(key, val); + } - latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(), - val.autoDelete(), key, cntDownLatchView, atomicsCtx); + latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(), + val.autoDelete(), key, cntDownLatchView, atomicsCacheCtx); - dsMap.put(key, latch); + dsMap.put(key, latch); - tx.commit(); + tx.commit(); - return latch; - } - catch (Error | Exception e) { - dsMap.remove(key); + return latch; + } + catch (Error | Exception e) { + dsMap.remove(key); - U.error(log, "Failed to create count down latch: " + name, e); + U.error(log, "Failed to create count down latch: " + name, e); - throw e; - } + throw e; } - }, atomicsCtx); + } + }, atomicsCacheCtx); } catch (Exception e) { throw new IgniteCheckedException("Failed to get count down latch by name: " + name, e); } + finally { + atomicsCacheCtx.gate().leave(); + } } /** @@ -907,7 +792,11 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If operation failed. */ public boolean removeCountDownLatch(final String name) throws IgniteCheckedException { - waitInitialization(); + assert name != null; + + checkAtomicsConfiguration(); + + atomicsCacheCtx.gate().enter(); try { return CU.outTx( @@ -915,7 +804,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { @Override public Boolean call() throws Exception { GridCacheInternal key = new GridCacheInternalKeyImpl(name); - try (IgniteTx tx = CU.txStartInternal(atomicsCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = CU.txStartInternal(atomicsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class); @@ -942,11 +831,14 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } } }, - atomicsCtx); + atomicsCacheCtx); } catch (Exception e) { throw new IgniteCheckedException("Failed to remove count down latch by name: " + name, e); } + finally { + atomicsCacheCtx.gate().leave(); + } } /** @@ -961,7 +853,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return CU.outTx( new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteTx tx = CU.txStartInternal(atomicsCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = CU.txStartInternal(atomicsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. R val = cast(dsView.get(key), cls); @@ -982,7 +874,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } } }, - atomicsCtx + atomicsCacheCtx ); } @@ -992,19 +884,10 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @param tx Committed transaction. */ public <K, V> void onTxCommitted(IgniteTxEx<K, V> tx) { - if (atomicsCtx == null) + if (atomicsCacheCtx == null) return; - if (!atomicsCtx.isDht() && tx.internal() && (!atomicsCtx.isColocated() || atomicsCtx.isReplicated())) { - try { - waitInitialization(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to wait for manager initialization.", e); - - return; - } - + if (!atomicsCacheCtx.isDht() && tx.internal() && (!atomicsCacheCtx.isColocated() || atomicsCacheCtx.isReplicated())) { Collection<IgniteTxEntry<K, V>> entries = tx.writeEntries(); if (log.isDebugEnabled()) @@ -1027,7 +910,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { latch0.onUpdate(val.get()); if (val.get() == 0 && val.autoDelete()) { - entry.cached().markObsolete(atomicsCtx.versions().next()); + entry.cached().markObsolete(atomicsCacheCtx.versions().next()); dsMap.remove(key); @@ -1057,87 +940,39 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } /** - * @throws IgniteCheckedException If thread is interrupted or manager - * was not successfully initialized. - */ - private void waitInitialization() throws IgniteCheckedException { - if (initLatch.getCount() > 0) - U.await(initLatch); - - if (!initFlag) - throw new IgniteCheckedException("DataStructures processor was not properly initialized."); - } - - /** - * @param cctx Cache context. - * @return {@code True} if {@link IgniteQueue} can be used with current cache configuration. - */ - private boolean supportsQueue(GridCacheContext cctx) { - return !(cctx.atomic() && !cctx.isLocal() && cctx.config().getAtomicWriteOrderMode() == CLOCK); - } - - /** - * @param cctx Cache context. - * @throws IgniteCheckedException If {@link IgniteQueue} can not be used with current cache configuration. - */ - private void checkSupportsQueue(GridCacheContext cctx) throws IgniteCheckedException { - if (cctx.atomic() && !cctx.isLocal() && cctx.config().getAtomicWriteOrderMode() == CLOCK) - throw new IgniteCheckedException("IgniteQueue can not be used with ATOMIC cache with CLOCK write order mode" + - " (change write order mode to PRIMARY in configuration)"); - } - - /** * Gets a set from cache or creates one if it's not cached. * * @param name Set name. - * @param collocated Collocation flag. + * @param cfg Set configuration. * @param create If {@code true} set will be created in case it is not in cache. * @return Set instance. * @throws IgniteCheckedException If failed. */ - @Nullable public <T> IgniteSet<T> set(final String name, boolean collocated, final boolean create) + @Nullable public <T> IgniteSet<T> set(final String name, + @Nullable IgniteCollectionConfiguration cfg, + final boolean create) throws IgniteCheckedException { - waitInitialization(); + A.notNull(name, "name"); - // TODO IGNITE-6. - return null; - /* - // Non collocated mode enabled only for PARTITIONED cache. - final boolean collocMode = cctx.cache().configuration().getCacheMode() != PARTITIONED || collocated; + if (create) + A.notNull(cfg, "cfg"); - if (cctx.atomic()) - return set0(name, collocMode, create); + GridCacheAdapter cache = cacheForCollection(cfg); - return CU.outTx(new Callable<IgniteSet<T>>() { - @Nullable @Override public IgniteSet<T> call() throws Exception { - return set0(name, collocMode, create); - } - }, cctx); - */ - } + final GridCacheContext cctx = cache.context(); - /** - * Removes set. - * - * @param name Set name. - * @return {@code True} if set was removed. - * @throws IgniteCheckedException If failed. - */ - public boolean removeSet(final String name) throws IgniteCheckedException { - waitInitialization(); + // Non collocated mode enabled only for PARTITIONED cache. + final boolean colloc = + create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || cfg.isCollocated()); - // TODO IGNITE-6. - return false; - /* if (cctx.atomic()) - return removeSet0(name); + return set0(cctx, name, colloc, create); - return CU.outTx(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - return removeSet0(name); + return CU.outTx(new Callable<IgniteSet<T>>() { + @Nullable @Override public IgniteSet<T> call() throws Exception { + return set0(cctx, name, colloc, create); } }, cctx); - */ } /** @@ -1155,37 +990,44 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { boolean create) throws IgniteCheckedException { - GridCacheSetHeaderKey key = new GridCacheSetHeaderKey(name); + cctx.gate().enter(); - GridCacheSetHeader hdr; + try { + GridCacheSetHeaderKey key = new GridCacheSetHeaderKey(name); - GridCacheAdapter cache = cctx.cache(); + GridCacheSetHeader hdr; - if (create) { - hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), collocated); + GridCacheAdapter cache = cctx.cache(); - GridCacheSetHeader old = retryPutIfAbsent(cache, key, hdr); + if (create) { + hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), collocated); - if (old != null) - hdr = old; - } - else - hdr = (GridCacheSetHeader)cache.get(key); + GridCacheSetHeader old = retryPutIfAbsent(cache, key, hdr); - if (hdr == null) - return null; + if (old != null) + hdr = old; + } + else + hdr = (GridCacheSetHeader)cache.get(key); - GridCacheSetProxy<T> set = setsMap.get(hdr.id()); + if (hdr == null) + return null; - if (set == null) { - GridCacheSetProxy<T> old = setsMap.putIfAbsent(hdr.id(), - set = new GridCacheSetProxy<>(cctx, new GridCacheSetImpl<T>(cctx, name, hdr))); + GridCacheSetProxy<T> set = setsMap.get(hdr.id()); - if (old != null) - set = old; - } + if (set == null) { + GridCacheSetProxy<T> old = setsMap.putIfAbsent(hdr.id(), + set = new GridCacheSetProxy<>(cctx, new GridCacheSetImpl<T>(cctx, name, hdr))); + + if (old != null) + set = old; + } - return set; + return set; + } + finally { + cctx.gate().leave(); + } } /** @@ -1195,7 +1037,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - private boolean removeSet0(GridCacheContext cctx, String name) throws IgniteCheckedException { + public boolean removeSet(GridCacheContext cctx, String name) throws IgniteCheckedException { GridCacheSetHeaderKey key = new GridCacheSetHeaderKey(name); GridCache cache = cctx.cache(); @@ -1213,7 +1055,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { try { cctx.closures().callAsyncNoFailover(BROADCAST, - new BlockSetCallable(cctx.name(), hdr.id()), + new BlockSetCallable(hdr.id()), nodes, true).get(); } @@ -1417,46 +1259,40 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Data structure processor memory stats [grid=" + ctx.gridName() + - ", cache=" + (atomicsCtx != null ? atomicsCtx.name() : null) + ']'); + ", cache=" + (atomicsCacheCtx != null ? atomicsCacheCtx.name() : null) + ']'); X.println(">>> dsMapSize: " + dsMap.size()); } /** - * @throws IgniteException If atomics configuration is not provided. + * @param cfg Collection configuration. + * @return Cache to use for collection. */ - private void checkAtomicsConfiguration() throws IgniteException { - if (cfg == null) - throw new IgniteException("Atomic data structure can be created, need to provide IgniteAtomicConfiguration."); - } + private GridCacheAdapter cacheForCollection(IgniteCollectionConfiguration cfg) { + // TODO IGNITE-29: start collection internal cache with required configuration or use existing one. + GridCacheAdapter cache = ctx.cache().internalCache("TEST_COLLECTION_CACHE"); - /** - * Predicate for queue continuous query. - */ - private static class QueueHeaderPredicate implements IgniteBiPredicate, Externalizable { - /** */ - private static final long serialVersionUID = 0L; + if (cache == null) + throw new IgniteException("TEST_COLLECTION_CACHE is not configured."); - /** - * Required by {@link Externalizable}. - */ - public QueueHeaderPredicate() { - // No-op. - } + if (cfg != null) { + CacheConfiguration ccfg = cache.configuration(); - /** {@inheritDoc} */ - @Override public boolean apply(Object key, Object val) { - return key instanceof GridCacheQueueHeaderKey; + assert ccfg.getCacheMode() == cfg.getCacheMode(); + assert ccfg.getAtomicityMode() == cfg.getAtomicityMode(); + assert ccfg.getMemoryMode() == cfg.getMemoryMode(); + assert ccfg.getDistributionMode() == cfg.getDistributionMode(); } - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) { - // No-op. - } + return cache; + } - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) { - // No-op. - } + /** + * @throws IgniteException If atomics configuration is not provided. + */ + private void checkAtomicsConfiguration() throws IgniteException { + if (atomicCfg == null) + throw new IgniteException("Atomic data structure can not be created, " + + "need to provide IgniteAtomicConfiguration."); } /** @@ -1465,16 +1301,13 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { @GridInternal private static class BlockSetCallable implements Callable<Void>, Externalizable { /** */ - private static final long serialVersionUID = -8892474927216478231L; + private static final long serialVersionUID = 0; /** Injected grid instance. */ @IgniteInstanceResource private Ignite ignite; /** */ - private String cacheName; - - /** */ private IgniteUuid setId; /** @@ -1485,35 +1318,28 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } /** - * @param cacheName Cache name. * @param setId Set ID. */ - private BlockSetCallable(String cacheName, IgniteUuid setId) { - this.cacheName = cacheName; + private BlockSetCallable(IgniteUuid setId) { this.setId = setId; } /** {@inheritDoc} */ @Override public Void call() throws IgniteCheckedException { - GridCacheAdapter cache = ((GridKernal)ignite).context().cache().internalCache(cacheName); + assert ignite != null; - assert cache != null; - - // TODO IGNITE-6 - // cache.context().dataStructures().blockSet(setId); + ((GridKernal)ignite).context().dataStructures().blockSet(setId); return null; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); U.writeGridUuid(out, setId); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); setId = U.readGridUuid(in); } @@ -1564,12 +1390,22 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public Void call() throws IgniteCheckedException { - GridCacheAdapter cache = ((GridKernal) ignite).context().cache().internalCache(cacheName); + assert ignite != null; + + GridCacheAdapter cache = ((GridKernal)ignite).context().cache().internalCache(cacheName); assert cache != null; - // TODO IGNITE-6 - // cache.context().dataStructures().removeSetData(setId, topVer); + GridCacheGateway gate = cache.context().gate(); + + gate.enter(); + + try { + ((GridKernal)ignite).context().dataStructures().removeSetData(cache.context(), setId, topVer); + } + finally { + gate.leave(); + } return null; } @@ -1593,4 +1429,214 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return "RemoveSetCallable [setId=" + setId + ']'; } } + + /** + * + */ + static enum DataStructureType { + /** */ + ATOMIC_LONG(IgniteAtomicLong.class.getSimpleName()), + + /** */ + ATOMIC_REF(IgniteAtomicReference.class.getSimpleName()), + + /** */ + ATOMIC_SEQ(IgniteAtomicSequence.class.getSimpleName()), + + /** */ + ATOMIC_STAMPED(IgniteAtomicStamped.class.getSimpleName()), + + /** */ + QUEUE(IgniteQueue.class.getSimpleName()), + + /** */ + SET(IgniteSet.class.getSimpleName()); + + /** */ + private static final DataStructureType[] VALS = values(); + + /** */ + private String name; + + /** + * @param name Name. + */ + DataStructureType(String name) { + this.name = name; + } + + /** + * @return Data structure public class name. + */ + public String className() { + return name; + } + + /** + * @param ord Ordinal value. + * @return Enumerated value or {@code null} if ordinal out of range. + */ + @Nullable public static DataStructureType fromOrdinal(int ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } + } + + /** + * + */ + static class DataStructureInfo implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String name; + + /** */ + private DataStructureType type; + + /** */ + private Object info; + + /** + * Required by {@link Externalizable}. + */ + public DataStructureInfo() { + // No-op. + } + + /** + * @param name Data structure name. + * @param type Data structure type. + * @param info Data structure information. + */ + DataStructureInfo(String name, DataStructureType type, Externalizable info) { + this.name = name; + this.type = type; + this.info = info; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, name); + + U.writeEnum(out, type); + + out.writeObject(info); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + name = U.readString(in); + + type = DataStructureType.fromOrdinal(in.readByte()); + + info = in.readObject(); + } + } + + /** + * @param info New data structure information. + */ + private void validateDataStructure(DataStructureInfo info) { + Map<String, DataStructureInfo> map = utilityCache.get(DATA_STRUCTURES_KEY); + + if (map != null) { + DataStructureInfo oldInfo = map.get(info.name); + + if (oldInfo != null) { + IgniteException err = validateDataStructure(oldInfo, info); + + if (err != null) + throw err; + } + } + } + + /** + * @param oldInfo Existing data structure information. + * @param info New data structure information. + * @return {@link IgniteException} if validation failed. + */ + @Nullable private static IgniteException validateDataStructure(DataStructureInfo oldInfo, DataStructureInfo info) { + if (oldInfo.type != info.type) { + return new IgniteException("Another data structure with the same name already created " + + "[name= " + info.name + + ", new= " + info.type.className() + + ", existing=" + oldInfo.type.className() + ']'); + } + + return null; + } + + /** + * + */ + static class AddAtomicProcessor implements + EntryProcessor<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>, IgniteException>, + Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private DataStructureInfo info; + + /** + * @param info Data structure information. + */ + AddAtomicProcessor(DataStructureInfo info) { + this.info = info; + } + + /** + * Required by {@link Externalizable}. + */ + public AddAtomicProcessor() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteException process( + MutableEntry<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> entry, + Object... args) + throws EntryProcessorException + { + Map<String, DataStructureInfo> map = entry.getValue(); + + if (map == null) { + map = new HashMap<>(); + + map.put(info.name, info); + + entry.setValue(map); + + return null; + } + + DataStructureInfo oldInfo = map.get(info.name); + + if (oldInfo == null) { + map = new HashMap<>(map); + + map.put(info.name, info); + + entry.setValue(map); + + return null; + } + + return validateDataStructure(oldInfo, info); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + info.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + info = new DataStructureInfo(); + + info.readExternal(in); + } + } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index 12bd7d4..3e07fff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@ -82,9 +82,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #incrementAndGet()}. */ private final Callable<Long> incAndGetCall = new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -104,8 +102,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext U.error(log, "Failed to increment and get: " + this, e); throw e; - } finally { - tx.close(); } } }; @@ -113,9 +109,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #getAndIncrement()}. */ private final Callable<Long> getAndIncCall = new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -135,8 +129,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext U.error(log, "Failed to get and increment: " + this, e); throw e; - } finally { - tx.close(); } } }; @@ -144,9 +136,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #decrementAndGet()}. */ private final Callable<Long> decAndGetCall = new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -166,8 +156,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext U.error(log, "Failed to decrement and get: " + this, e); throw e; - } finally { - tx.close(); } } }; @@ -175,9 +163,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #getAndDecrement()}. */ private final Callable<Long> getAndDecCall = new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -197,8 +183,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext U.error(log, "Failed to get and decrement and get: " + this, e); throw e; - } finally { - tx.close(); } } }; @@ -298,6 +282,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext @Override public boolean compareAndSet(long expVal, long newVal) throws IgniteCheckedException { checkRemoved(); + return CU.outTx(internalCompareAndSet(expVal, newVal), ctx); } @@ -331,6 +316,19 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext return rmvd; } + /** {@inheritDoc} */ + @Override public void close() { + if (rmvd) + return; + + try { + ctx.kernalContext().dataStructures().removeAtomicLong(name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + /** * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode. * @@ -340,9 +338,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalAddAndGet(final long l) { return new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -362,8 +358,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext U.error(log, "Failed to add and get: " + this, e); throw e; - } finally { - tx.close(); } } }; @@ -378,9 +372,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalGetAndAdd(final long l) { return new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -400,8 +392,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext U.error(log, "Failed to get and add: " + this, e); throw e; - } finally { - tx.close(); } } }; @@ -416,9 +406,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalGetAndSet(final long l) { return new Callable<Long>() { @Override public Long call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -438,8 +426,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext U.error(log, "Failed to get and set: " + this, e); throw e; - } finally { - tx.close(); } } }; @@ -456,9 +442,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Boolean> internalCompareAndSet(final long expVal, final long newVal) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -480,8 +464,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext U.error(log, "Failed to compare and set: " + this, e); throw e; - } finally { - tx.close(); } } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index 10a0708..b68ce99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -157,6 +157,19 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef return rmvd; } + /** {@inheritDoc} */ + @Override public void close() { + if (rmvd) + return; + + try { + ctx.kernalContext().dataStructures().removeAtomicReference(name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + /** * Method make wrapper predicate for existing value. * @@ -194,10 +207,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef private Callable<Boolean> internalSet(final T val) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); if (ref == null) @@ -215,8 +225,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef U.error(log, "Failed to set value [val=" + val + ", atomicReference=" + this + ']', e); throw e; - } finally { - tx.close(); } } }; @@ -234,9 +242,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef final IgniteClosure<T, T> newValClos) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); if (ref == null) @@ -262,8 +268,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef newValClos + ", atomicReference" + this + ']', e); throw e; - } finally { - tx.close(); } } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index 36214e5..94960f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@ -196,6 +196,19 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt return rmvd; } + /** {@inheritDoc} */ + @Override public void close() { + if (rmvd) + return; + + try { + ctx.kernalContext().dataStructures().removeAtomicStamped(name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + /** * Method make wrapper closure for existing value. * @@ -220,10 +233,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt private Callable<Boolean> internalSet(final T val, final S stamp) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); if (stmp == null) @@ -241,8 +251,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt U.error(log, "Failed to set [val=" + val + ", stamp=" + stamp + ", atomicStamped=" + this + ']', e); throw e; - } finally { - tx.close(); } } }; @@ -263,9 +271,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt final IgniteClosure<S, S> newStampClos) { return new Callable<Boolean>() { @Override public Boolean call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); if (stmp == null) @@ -292,8 +298,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt ", atomicStamped=" + this + ']', e); throw e; - } finally { - tx.close(); } } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 573b2be..d9b41a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -103,8 +103,14 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc * @param latchView Latch projection. * @param ctx Cache context. */ - public GridCacheCountDownLatchImpl(String name, int cnt, int initCnt, boolean autoDel, GridCacheInternalKey key, - CacheProjection<GridCacheInternalKey, GridCacheCountDownLatchValue> latchView, GridCacheContext ctx) { + public GridCacheCountDownLatchImpl(String name, + int cnt, + int initCnt, + boolean autoDel, + GridCacheInternalKey key, + CacheProjection<GridCacheInternalKey, GridCacheCountDownLatchValue> latchView, + GridCacheContext ctx) + { assert name != null; assert cnt >= 0; assert initCnt >= 0; @@ -220,9 +226,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc internalLatch = CU.outTx( new Callable<CountDownLatch>() { @Override public CountDownLatch call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheCountDownLatchValue val = latchView.get(key); if (val == null) { @@ -238,9 +242,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc return new CountDownLatch(val.get()); } - finally { - tx.close(); - } } }, ctx @@ -262,6 +263,19 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc } /** {@inheritDoc} */ + @Override public void close() { + if (rmvd) + return; + + try { + ctx.kernalContext().dataStructures().removeCountDownLatch(name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx.kernalContext()); out.writeUTF(name); @@ -319,9 +333,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public Integer call() throws Exception { - IgniteTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ); - - try { + try (IgniteTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheCountDownLatchValue latchVal = latchView.get(key); if (latchVal == null) { @@ -352,9 +364,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc return retVal; } - finally { - tx.close(); - } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index 33b5279..8d6282d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -329,19 +329,14 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp @Override public void clear(int batchSize) throws IgniteException { A.ensure(batchSize >= 0, "Batch size cannot be negative: " + batchSize); - try { - IgniteBiTuple<Long, Long> t = (IgniteBiTuple<Long, Long>)cache.invoke(queueKey, new ClearProcessor(id)); + IgniteBiTuple<Long, Long> t = (IgniteBiTuple<Long, Long>)cache.invoke(queueKey, new ClearProcessor(id)); - if (t == null) - return; + if (t == null) + return; - checkRemoved(t.get1()); + checkRemoved(t.get1()); - removeKeys(cache, id, queueName, collocated, t.get1(), t.get2(), batchSize); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + removeKeys(id, queueName, collocated, t.get1(), t.get2(), batchSize); } /** {@inheritDoc} */ @@ -371,24 +366,21 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp } /** - * @param cache Cache. * @param id Queue unique ID. * @param name Queue name. * @param collocated Collocation flag. * @param startIdx Start item index. * @param endIdx End item index. * @param batchSize Batch size. - * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - static void removeKeys(IgniteCache cache, + private void removeKeys( IgniteUuid id, String name, boolean collocated, long startIdx, long endIdx, int batchSize) - throws IgniteCheckedException { Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10); @@ -510,6 +502,27 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp return itemKey(id, queueName, collocated(), idx); } + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + public void close() { + if (rmvd) + return; + + GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.getAndRemove(new GridCacheQueueHeaderKey(queueName)); + + rmvd = true; + + if (hdr == null || hdr.empty()) + return; + + removeKeys(hdr.id(), + queueName, + hdr.collocated(), + hdr.head(), + hdr.tail(), + 0); + } + /** * @param id Queue unique ID. * @param queueName Queue name. @@ -532,6 +545,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp } /** + * */ private class QueueIterator implements Iterator<T> { /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java index c0ced41..1f1c62a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java @@ -663,6 +663,31 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable { } /** {@inheritDoc} */ + @Override public void close() { + gate.enter(); + + try { + if (cctx.transactional()) { + CU.outTx(new Callable<Void>() { + @Override public Void call() throws Exception { + delegate.close(); + + return null; + } + }, cctx); + } + else + delegate.close(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + + /** {@inheritDoc} */ @Override public String name() { return delegate.name(); } @@ -729,7 +754,7 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable { try { IgniteBiTuple<GridKernalContext, String> t = stash.get(); - return t.get1().dataStructures().queue(t.get2(), 0, false, false); + return t.get1().dataStructures().queue(t.get2(), null, 0, false); } catch (IgniteCheckedException e) { throw U.withCause(new InvalidObjectException(e.getMessage()), e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 363b77f..69f79dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -325,6 +325,19 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } /** {@inheritDoc} */ + @Override public void close() { + try { + if (rmvd) + return; + + ctx.kernalContext().dataStructures().removeSet(ctx, name); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") private GridCloseableIterator<T> iterator0() { try { @@ -360,6 +373,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite throw new IgniteException(e); } } + + + /** * @param call Callable. * @return Callable result. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index 59ca3f8..b199526 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@ -468,6 +468,38 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable { } /** {@inheritDoc} */ + @Override public void close() { + enterBusy(); + + try { + gate.enter(); + + try { + if (cctx.transactional()) { + CU.outTx(new Callable<Void>() { + @Override public Void call() throws Exception { + delegate.close(); + + return null; + } + }, cctx); + } + else + delegate.close(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + gate.leave(); + } + } + finally { + leaveBusy(); + } + } + + /** {@inheritDoc} */ @Override public String name() { return delegate.name(); } @@ -521,7 +553,7 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable { try { IgniteBiTuple<GridKernalContext, String> t = stash.get(); - return t.get1().dataStructures().set(t.get2(), false, false); + return t.get1().dataStructures().set(t.get2(), null, false); } catch (IgniteCheckedException e) { throw U.withCause(new InvalidObjectException(e.getMessage()), e);