ignite-1007 Race in data structures processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/811872ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/811872ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/811872ce Branch: refs/heads/ignite-gg-10411 Commit: 811872ce692666fe8c77235f175b7ec15f717d30 Parents: 5160088 Author: agura <ag...@gridgain.com> Authored: Fri Jun 12 18:36:58 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Fri Jun 12 18:36:58 2015 +0300 ---------------------------------------------------------------------- .../datastructures/DataStructuresProcessor.java | 67 +++++++++++++++++++- 1 file changed, 64 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/811872ce/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index aa3bfe2..473a2ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -67,6 +67,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** */ private static final long RETRY_DELAY = 1; + /** Initialization latch. */ + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** Initialization failed flag. */ + private boolean initFailed; + /** Cache contains only {@code GridCacheInternal,GridCacheInternal}. */ private IgniteInternalCache<GridCacheInternal, GridCacheInternal> dsView; @@ -145,6 +151,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx = atomicsCache.context(); } + + initLatch.countDown(); } /** @@ -167,6 +175,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { @Override public void onKernalStop(boolean cancel) { super.onKernalStop(cancel); + if (initLatch.getCount() > 0) { + initFailed = true; + + initLatch.countDown(); + } + if (qryId != null) dsCacheCtx.continuousQueries().cancelInternalQuery(qryId); } @@ -187,6 +201,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { { A.notNull(name, "name"); + awaitInitialization(); + checkAtomicsConfiguration(); startQuery(); @@ -277,6 +293,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { public final void removeSequence(final String name) throws IgniteCheckedException { assert name != null; + awaitInitialization(); + checkAtomicsConfiguration(); removeDataStructure(new IgniteCallable<Void>() { @@ -315,6 +333,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { final boolean create) throws IgniteCheckedException { A.notNull(name, "name"); + awaitInitialization(); + checkAtomicsConfiguration(); startQuery(); @@ -431,6 +451,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert dsCacheCtx != null; + awaitInitialization(); + removeDataStructure(new IgniteCallable<Void>() { @Override public Void call() throws Exception { dsCacheCtx.gate().enter(); @@ -520,6 +542,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { { A.notNull(name, "name"); + awaitInitialization(); + checkAtomicsConfiguration(); startQuery(); @@ -585,6 +609,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert dsCacheCtx != null; + awaitInitialization(); + removeDataStructure(new IgniteCallable<Void>() { @Override public Void call() throws Exception { dsCacheCtx.gate().enter(); @@ -623,6 +649,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { final S initStamp, final boolean create) throws IgniteCheckedException { A.notNull(name, "name"); + awaitInitialization(); + checkAtomicsConfiguration(); startQuery(); @@ -688,6 +716,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert dsCacheCtx != null; + awaitInitialization(); + removeDataStructure(new IgniteCallable<Void>() { @Override public Void call() throws Exception { dsCacheCtx.gate().enter(); @@ -725,6 +755,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { throws IgniteCheckedException { A.notNull(name, "name"); + awaitInitialization(); + String cacheName = null; if (cfg != null) { @@ -801,6 +833,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert cctx != null; + awaitInitialization(); + IgniteCallable<GridCacheQueueHeader> rmv = new IgniteCallable<GridCacheQueueHeader>() { @Override public GridCacheQueueHeader call() throws Exception { return (GridCacheQueueHeader)retryRemove(cctx.cache(), new GridCacheQueueHeaderKey(name)); @@ -837,6 +871,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { boolean create) throws IgniteCheckedException { + awaitInitialization(); + Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); if (!create && (dsMap == null || !dsMap.containsKey(dsInfo.name))) @@ -887,6 +923,24 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** + * Awaits for processor initialization. + */ + private void awaitInitialization() { + if (initLatch.getCount() > 0) { + try { + U.await(initLatch); + + if (initFailed) + throw new IllegalStateException("Failed to initialize data structures processor."); + } + catch (IgniteInterruptedCheckedException e) { + throw new IllegalStateException("Failed to initialize data structures processor " + + "(thread has been interrupted).", e); + } + } + } + + /** * @param dsMap Map with data structure information. * @param info New data structure information. * @param create Create flag. @@ -930,6 +984,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { { A.notNull(name, "name"); + awaitInitialization(); + if (create) A.ensure(cnt >= 0, "count can not be negative"); @@ -997,9 +1053,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert dsCacheCtx != null; + awaitInitialization(); + removeDataStructure(new IgniteCallable<Void>() { - @Override - public Void call() throws Exception { + @Override public Void call() throws Exception { GridCacheInternal key = new GridCacheInternalKeyImpl(name); dsCacheCtx.gate().enter(); @@ -1169,6 +1226,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { throws IgniteCheckedException { A.notNull(name, "name"); + awaitInitialization(); + String cacheName = null; if (cfg != null) @@ -1196,6 +1255,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert name != null; assert cctx != null; + awaitInitialization(); + IgniteCallable<GridCacheSetHeader> rmv = new IgniteCallable<GridCacheSetHeader>() { @Override public GridCacheSetHeader call() throws Exception { return (GridCacheSetHeader)retryRemove(cctx.cache(), new GridCacheSetHeaderKey(name)); @@ -1326,7 +1387,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** * */ - static enum DataStructureType { + enum DataStructureType { /** */ ATOMIC_LONG(IgniteAtomicLong.class.getSimpleName()),