# ignite-80
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/821c016b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/821c016b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/821c016b Branch: refs/heads/ignite-80 Commit: 821c016b11919ad98d0c7caaec938bc4506a32f6 Parents: ae43a30 Author: sboikov <sboi...@gridgain.com> Authored: Tue Apr 7 13:22:14 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Apr 7 17:39:47 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 18 +- .../datastructures/DataStructuresProcessor.java | 450 +++++++------------ .../IgniteCacheDataStructuresSelfTestSuite.java | 5 +- 3 files changed, 178 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/821c016b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index fe81006..f7e19d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1679,8 +1679,6 @@ public class IgnitionEx { final boolean hasHadoop = IgniteComponentType.HADOOP.inClassPath(); - final boolean hasAtomics = cfg.getAtomicConfiguration() != null; - final boolean clientDisco = cfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi; CacheConfiguration[] copies; @@ -1709,14 +1707,11 @@ public class IgnitionEx { "\" because it is reserved for internal purposes."); } - int addCacheCnt = 2; // Always add marshaller and utility caches. + int addCacheCnt = 3; // Always add marshaller, utility, atomics caches. if (hasHadoop) addCacheCnt++; - if (hasAtomics) - addCacheCnt++; - copies = new CacheConfiguration[cacheCfgs.length + addCacheCnt]; int cloneIdx = 2; @@ -1724,21 +1719,17 @@ public class IgnitionEx { if (hasHadoop) copies[cloneIdx++] = CU.hadoopSystemCache(); - if (hasAtomics) - copies[cloneIdx++] = atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco); + copies[cloneIdx++] = atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco); for (CacheConfiguration ccfg : cacheCfgs) copies[cloneIdx++] = new CacheConfiguration(ccfg); } else { - int cacheCnt = 2; // Always add marshaller and utility caches. + int cacheCnt = 3; // Always add marshaller, utility, atomics caches. if (hasHadoop) cacheCnt++; - if (hasAtomics) - cacheCnt++; - copies = new CacheConfiguration[cacheCnt]; int cacheIdx = 2; @@ -1746,8 +1737,7 @@ public class IgnitionEx { if (hasHadoop) copies[cacheIdx++] = CU.hadoopSystemCache(); - if (hasAtomics) - copies[cacheIdx] = atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco); + copies[cacheIdx] = atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco); } // Always add marshaller and utility caches. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/821c016b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 1ef5c55..3b20372 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -88,7 +88,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { private final AtomicConfiguration atomicCfg; /** */ - private GridCacheProjectionEx<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> utilityCache; + private GridCacheProjectionEx<CacheDataStructuresConfigurationKey, Map<String, DataStructureInfo>> dsInfoView; /** * @param ctx Context. @@ -107,29 +107,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; - utilityCache = (GridCacheProjectionEx)ctx.cache().utilityCache(); + GridCache atomicsCache = ctx.cache().atomicsCache(); - assert utilityCache != null; + assert atomicsCache != null; - if (atomicCfg != null) { - GridCache atomicsCache = ctx.cache().atomicsCache(); + dsInfoView = (GridCacheProjectionEx)atomicsCache; - assert atomicsCache != null; + dsView = atomicsCache; - dsView = atomicsCache; + cntDownLatchView = atomicsCache; - cntDownLatchView = atomicsCache; + atomicLongView = atomicsCache; - atomicLongView = atomicsCache; + atomicRefView = atomicsCache; - atomicRefView = atomicsCache; + atomicStampedView = atomicsCache; - atomicStampedView = atomicsCache; + seqView = atomicsCache; - seqView = atomicsCache; - - dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context(); - } + dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context(); } /** @@ -150,13 +146,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - return getAtomic(new IgniteOutClosureX<IgniteAtomicSequence>() { - @Override public IgniteAtomicSequence applyx() throws IgniteCheckedException { + return getAtomic(new IgniteClosureX<IgniteInternalTx, IgniteAtomicSequence>() { + @Override public IgniteAtomicSequence applyx(IgniteInternalTx tx) throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); - dsCacheCtx.gate().enter(); - - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try { GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class); // Check that sequence hasn't been created in other thread yet. @@ -209,8 +203,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, seq); - tx.commit(); - return seq; } catch (Error | Exception e) { @@ -220,9 +212,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { throw e; } - finally { - dsCacheCtx.gate().leave(); - } } }, new DataStructureInfo(name, ATOMIC_SEQ, null), create, IgniteAtomicSequence.class); } @@ -238,21 +227,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - removeDataStructure(new IgniteCallable<Void>() { - @Override public Void call() throws Exception { - dsCacheCtx.gate().enter(); - - try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); + removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() { + @Override public Void applyx(IgniteInternalTx tx) throws IgniteCheckedException { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); - removeInternal(key, GridCacheAtomicSequenceValue.class); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove sequence by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + removeInternal(tx, key, GridCacheAtomicSequenceValue.class); return null; } @@ -276,13 +255,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - return getAtomic(new IgniteOutClosureX<IgniteAtomicLong>() { - @Override public IgniteAtomicLong applyx() throws IgniteCheckedException { + return getAtomic(new IgniteClosureX<IgniteInternalTx, IgniteAtomicLong>() { + @Override public IgniteAtomicLong applyx(IgniteInternalTx tx) throws IgniteCheckedException { final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); - dsCacheCtx.gate().enter(); - - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try { GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class); // Check that atomic long hasn't been created in other thread yet. @@ -307,8 +284,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, a); - tx.commit(); - return a; } catch (Error | Exception e) { @@ -318,9 +293,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { throw e; } - finally { - dsCacheCtx.gate().leave(); - } } }, new DataStructureInfo(name, ATOMIC_LONG, null), create, IgniteAtomicLong.class); } @@ -333,49 +305,51 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @return Data structure instance. * @throws IgniteCheckedException If failed. */ - @Nullable private <T> T getAtomic(final IgniteOutClosureX<T> c, + @Nullable private <T> T getAtomic(final IgniteClosureX<IgniteInternalTx, T> c, DataStructureInfo dsInfo, boolean create, Class<? extends T> cls) throws IgniteCheckedException { - Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); + dsCacheCtx.gate().enter(); - if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name))) - return null; + try { + Map<String, DataStructureInfo> dsMap = dsInfoView.get(DATA_STRUCTURES_KEY); - IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, create); + if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name))) + return null; - if (err != null) - throw err; + IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, create); - final GridCacheInternalKey key = new GridCacheInternalKeyImpl(dsInfo.name); + if (err != null) + throw err; - // Check type of structure received by key from local cache. - T dataStructure = cast(this.dsMap.get(key), cls); + final GridCacheInternalKey key = new GridCacheInternalKeyImpl(dsInfo.name); - if (dataStructure != null) - return dataStructure; + // Check type of structure received by key from local cache. + T dataStructure = cast(this.dsMap.get(key), cls); - if (!create) - return c.applyx(); + if (dataStructure != null) + return dataStructure; - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); + try (IgniteInternalTx tx = dsInfoView.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + if (create) { + err = dsInfoView.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); - if (err != null) - throw err; - - dataStructure = ctx.closure().callLocalSafe(new Callable<T>() { - @Override public T call() throws Exception { - return c.applyx(); + if (err != null) + throw err; } - }, false).get(); - tx.commit(); - } + dataStructure = c.applyx(tx); - return dataStructure; + tx.commit(); + } + + return dataStructure; + } + finally { + dsCacheCtx.gate().leave(); + } } /** @@ -388,19 +362,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert dsCacheCtx != null; - removeDataStructure(new IgniteCallable<Void>() { - @Override public Void call() throws Exception { - dsCacheCtx.gate().enter(); - - try { - removeInternal(new GridCacheInternalKeyImpl(name), GridCacheAtomicLongValue.class); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove atomic long by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() { + @Override public Void applyx(IgniteInternalTx tx) throws IgniteCheckedException { + removeInternal(tx, new GridCacheInternalKeyImpl(name), GridCacheAtomicLongValue.class); return null; } @@ -414,45 +378,52 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @param afterRmv Optional closure to run after data structure removed. * @throws IgniteCheckedException If failed. */ - private <T> void removeDataStructure(IgniteCallable<T> c, + private <T> void removeDataStructure(IgniteClosureX<IgniteInternalTx, T> c, String name, DataStructureType type, @Nullable IgniteInClosureX<T> afterRmv) throws IgniteCheckedException { - Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); - - if (dsMap == null || !dsMap.containsKey(name)) - return; - - DataStructureInfo dsInfo = new DataStructureInfo(name, type, null); + T rmvInfo; - IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, false); + dsCacheCtx.gate().enter(); - if (err != null) - throw err; + try { + Map<String, DataStructureInfo> dsMap = dsInfoView.get(DATA_STRUCTURES_KEY); - T rmvInfo; + if (dsMap == null || !dsMap.containsKey(name)) + return; - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - T2<Boolean, IgniteCheckedException> res = - utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get(); + DataStructureInfo dsInfo = new DataStructureInfo(name, type, null); - err = res.get2(); + IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, false); if (err != null) throw err; - assert res.get1() != null; + try (IgniteInternalTx tx = dsInfoView.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + T2<Boolean, IgniteCheckedException> res = + dsInfoView.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get(); - boolean exists = res.get1(); + err = res.get2(); - if (!exists) - return; + if (err != null) + throw err; + + assert res.get1() != null; - rmvInfo = ctx.closure().callLocalSafe(c, false).get(); + boolean exists = res.get1(); - tx.commit(); + if (!exists) + return; + + rmvInfo = c.applyx(tx); + + tx.commit(); + } + } + finally { + dsCacheCtx.gate().leave(); } if (afterRmv != null && rmvInfo != null) @@ -479,13 +450,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - return getAtomic(new IgniteOutClosureX<IgniteAtomicReference>() { - @Override public IgniteAtomicReference<T> applyx() throws IgniteCheckedException { + return getAtomic(new IgniteClosureX<IgniteInternalTx, IgniteAtomicReference>() { + @Override public IgniteAtomicReference<T> applyx(IgniteInternalTx tx) throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); - dsCacheCtx.gate().enter(); - - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try { GridCacheAtomicReferenceValue val = cast(dsView.get(key), GridCacheAtomicReferenceValue.class); @@ -512,8 +481,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, ref); - tx.commit(); - return ref; } catch (Error | Exception e) { @@ -523,9 +490,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { throw e; } - finally { - dsCacheCtx.gate().leave(); - } } }, new DataStructureInfo(name, ATOMIC_REF, null), create, IgniteAtomicReference.class); } @@ -540,21 +504,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert dsCacheCtx != null; - removeDataStructure(new IgniteCallable<Void>() { - @Override public Void call() throws Exception { - dsCacheCtx.gate().enter(); - - try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); + removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() { + @Override public Void applyx(IgniteInternalTx tx) throws IgniteCheckedException { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); - removeInternal(key, GridCacheAtomicReferenceValue.class); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove atomic reference by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + removeInternal(tx, key, GridCacheAtomicReferenceValue.class); return null; } @@ -580,15 +534,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - return getAtomic(new IgniteOutClosureX<IgniteAtomicStamped>() { - @Override public IgniteAtomicStamped<T, S> applyx() throws IgniteCheckedException { + return getAtomic(new IgniteClosureX<IgniteInternalTx, IgniteAtomicStamped>() { + @Override public IgniteAtomicStamped<T, S> applyx(IgniteInternalTx tx) throws IgniteCheckedException { GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name); - dsCacheCtx.gate().enter(); - - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicStampedValue val = cast(dsView.get(key), - GridCacheAtomicStampedValue.class); + try { + GridCacheAtomicStampedValue val = cast(dsView.get(key), GridCacheAtomicStampedValue.class); // Check that atomic stamped hasn't been created in other thread yet. GridCacheAtomicStampedEx stmp = cast(dsMap.get(key), @@ -613,8 +564,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, stmp); - tx.commit(); - return stmp; } catch (Error | Exception e) { @@ -624,9 +573,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { throw e; } - finally { - dsCacheCtx.gate().leave(); - } } }, new DataStructureInfo(name, ATOMIC_STAMPED, null), create, IgniteAtomicStamped.class); } @@ -641,21 +587,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert dsCacheCtx != null; - removeDataStructure(new IgniteCallable<Void>() { - @Override public Void call() throws Exception { - dsCacheCtx.gate().enter(); + removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() { + @Override public Void applyx(IgniteInternalTx tx) throws IgniteCheckedException { + GridCacheInternal key = new GridCacheInternalKeyImpl(name); - try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); - - removeInternal(key, GridCacheAtomicStampedValue.class); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove atomic stamped by name: " + name, e); - } - finally { - dsCacheCtx.gate().leave(); - } + removeInternal(tx, key, GridCacheAtomicStampedValue.class); return null; } @@ -712,11 +648,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert cctx != null; - IgniteCallable<GridCacheQueueHeader> rmv = new IgniteCallable<GridCacheQueueHeader>() { - @Override public GridCacheQueueHeader call() throws Exception { - return (GridCacheQueueHeader)retryRemove(cctx.cache(), new GridCacheQueueHeaderKey(name)); - } - }; + IgniteClosureX<IgniteInternalTx, GridCacheQueueHeader> rmv = + new IgniteClosureX<IgniteInternalTx, GridCacheQueueHeader>() { + @Override public GridCacheQueueHeader applyx(IgniteInternalTx tx) throws IgniteCheckedException { + return (GridCacheQueueHeader)cctx.cache().remove(new GridCacheQueueHeaderKey(name), null); + } + }; CIX1<GridCacheQueueHeader> afterRmv = new CIX1<GridCacheQueueHeader>() { @Override public void applyx(GridCacheQueueHeader hdr) throws IgniteCheckedException { @@ -748,53 +685,56 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { boolean create) throws IgniteCheckedException { - Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); + dsCacheCtx.gate().enter(); - if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name))) - return null; + try { + Map<String, DataStructureInfo> dsMap = dsInfoView.get(DATA_STRUCTURES_KEY); - IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, create); + if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name))) + return null; - if (err != null) - throw err; + IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, create); - if (!create) { - DataStructureInfo oldInfo = dsMap.get(dsInfo.name); + if (err != null) + throw err; - assert oldInfo.info instanceof CollectionInfo : oldInfo.info; + if (!create) { + DataStructureInfo oldInfo = dsMap.get(dsInfo.name); - String cacheName = ((CollectionInfo)oldInfo.info).cacheName; + assert oldInfo.info instanceof CollectionInfo : oldInfo.info; - GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context(); + String cacheName = ((CollectionInfo)oldInfo.info).cacheName; - return c.applyx(cacheCtx); - } + GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context(); - T col; + return c.applyx(cacheCtx); + } - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - T2<String, IgniteCheckedException> res = - utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get(); + T col; - err = res.get2(); + try (IgniteInternalTx tx = dsInfoView.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + T2<String, IgniteCheckedException> res = + dsInfoView.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get(); - if (err != null) - throw err; + err = res.get2(); - String cacheName = res.get1(); + if (err != null) + throw err; - final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context(); + String cacheName = res.get1(); - col = ctx.closure().callLocalSafe(new Callable<T>() { - @Override public T call() throws Exception { - return c.applyx(cacheCtx); - } - }, false).get(); + final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context(); - tx.commit(); - } + col = c.applyx(cacheCtx); + + tx.commit(); + } - return col; + return col; + } + finally { + dsCacheCtx.gate().leave(); + } } /** @@ -846,15 +786,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { checkAtomicsConfiguration(); - return getAtomic(new IgniteOutClosureX<IgniteCountDownLatch>() { - @Override public IgniteCountDownLatch applyx() throws IgniteCheckedException { + return getAtomic(new IgniteClosureX<IgniteInternalTx, IgniteCountDownLatch>() { + @Override public IgniteCountDownLatch applyx(IgniteInternalTx tx) throws IgniteCheckedException { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); - dsCacheCtx.gate().enter(); - - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue val = cast(dsView.get(key), - GridCacheCountDownLatchValue.class); + try { + GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class); // Check that count down hasn't been created in other thread yet. GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class); @@ -879,8 +816,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsMap.put(key, latch); - tx.commit(); - return latch; } catch (Error | Exception e) { @@ -890,9 +825,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { throw e; } - finally { - dsCacheCtx.gate().leave(); - } } }, new DataStructureInfo(name, COUNT_DOWN_LATCH, null), create, GridCacheCountDownLatchEx.class); } @@ -907,38 +839,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert dsCacheCtx != null; - removeDataStructure(new IgniteCallable<Void>() { - @Override - public Void call() throws Exception { + removeDataStructure(new IgniteClosureX<IgniteInternalTx, Void>() { + @Override public Void applyx(IgniteInternalTx tx) throws IgniteCheckedException { GridCacheInternal key = new GridCacheInternalKeyImpl(name); - dsCacheCtx.gate().enter(); - - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - // Check correctness type of removable object. - GridCacheCountDownLatchValue val = - cast(dsView.get(key), GridCacheCountDownLatchValue.class); - - if (val != null) { - if (val.get() > 0) { - throw new IgniteCheckedException("Failed to remove count down latch " + - "with non-zero count: " + val.get()); - } - - dsView.removex(key); - - tx.commit(); - } else - tx.setRollbackOnly(); + // Check correctness type of removable object. + GridCacheCountDownLatchValue val = + cast(dsView.get(key), GridCacheCountDownLatchValue.class); - return null; - } catch (Error | Exception e) { - U.error(log, "Failed to remove data structure: " + key, e); + if (val != null) { + if (val.get() > 0) + throw new IgniteCheckedException("Failed to remove count down latch " + + "with non-zero count: " + val.get()); - throw e; - } finally { - dsCacheCtx.gate().leave(); + dsView.removex(key); } + else + tx.setRollbackOnly(); + + return null; } }, name, COUNT_DOWN_LATCH, null); } @@ -946,38 +865,27 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** * Remove internal entry by key from cache. * + * @param tx Transaction for internal atomics cache. * @param key Internal entry key. * @param cls Class of object which will be removed. If cached object has different type exception will be thrown. * @return Method returns true if sequence has been removed and false if it's not cached. * @throws IgniteCheckedException If removing failed or class of object is different to expected class. */ - private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException { - return CU.outTx( - new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - // Check correctness type of removable object. - R val = cast(dsView.get(key), cls); - - if (val != null) { - dsView.removex(key); - - tx.commit(); - } - else - tx.setRollbackOnly(); + private <R> boolean removeInternal( + IgniteInternalTx tx, + GridCacheInternal key, + final Class<R> cls) + throws IgniteCheckedException + { + // Check correctness type of removable object. + R val = cast(dsView.get(key), cls); - return val != null; - } - catch (Error | Exception e) { - U.error(log, "Failed to remove data structure: " + key, e); + if (val != null) + dsView.removex(key); + else + tx.setRollbackOnly(); - throw e; - } - } - }, - dsCacheCtx - ); + return val != null; } /** @@ -1084,11 +992,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert cctx != null; - IgniteCallable<GridCacheSetHeader> rmv = new IgniteCallable<GridCacheSetHeader>() { - @Override public GridCacheSetHeader call() throws Exception { - return (GridCacheSetHeader)retryRemove(cctx.cache(), new GridCacheSetHeaderKey(name)); - } - }; + IgniteClosureX<IgniteInternalTx, GridCacheSetHeader> rmv = + new IgniteClosureX<IgniteInternalTx, GridCacheSetHeader>() { + @Override public GridCacheSetHeader applyx(IgniteInternalTx tx) throws IgniteCheckedException { + return (GridCacheSetHeader)cctx.cache().remove(new GridCacheSetHeaderKey(name), null); + } + }; CIX1<GridCacheSetHeader> afterRmv = new CIX1<GridCacheSetHeader>() { @Override public void applyx(GridCacheSetHeader hdr) throws IgniteCheckedException { @@ -1100,21 +1009,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** - * @param cache Cache. - * @param key Key to remove. - * @throws IgniteCheckedException If failed. - * @return Removed value. - */ - @SuppressWarnings("unchecked") - @Nullable private <T> T retryRemove(final GridCache cache, final Object key) throws IgniteCheckedException { - return retry(log, new Callable<T>() { - @Nullable @Override public T call() throws Exception { - return (T)cache.remove(key); - } - }); - } - - /** * @param log Logger. * @param call Callable. * @return Callable result. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/821c016b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java index 99beb0f..497b6b0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java @@ -62,9 +62,8 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueApiSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedQueueMultiNodeSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueMultiNodeSelfTest.class)); - // TODO: IGNITE-80. - //suite.addTest(new TestSuite(GridCachePartitionedQueueCreateMultiNodeSelfTest.class)); - //suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.class)); + suite.addTest(new TestSuite(GridCachePartitionedQueueCreateMultiNodeSelfTest.class)); + suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedSetSelfTest.class)); suite.addTest(new TestSuite(IgnitePartitionedSetNoBackupsSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedAtomicSetSelfTest.class));