# ignite-743
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/23a41dfd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/23a41dfd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/23a41dfd Branch: refs/heads/ignite-755 Commit: 23a41dfd2b7904c72a9fdebe35ef00b02ad62b3a Parents: d6434bb Author: sboikov <sboi...@gridgain.com> Authored: Thu Apr 16 10:51:10 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Apr 16 11:03:10 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheGateway.java | 67 +++++----- .../processors/cache/IgniteCacheProxy.java | 129 +++++++------------ .../datastreamer/DataStreamerUpdateJob.java | 12 -- 3 files changed, 81 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23a41dfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 97fada9..aa73414 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -66,10 +66,10 @@ public class GridCacheGateway<K, V> { /** * Enter a cache call. * - * @return {@code true} if enter successful, {@code false} if the cache or the node was stopped. + * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped. */ public boolean enterIfNotClosed() { - enterIfNotClosedNoLock(); + onEnter(); // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. @@ -87,17 +87,16 @@ public class GridCacheGateway<K, V> { /** * Enter a cache call without lock. * - * @return {@code true} if enter successful, {@code false} if the cache or the node was stopped. + * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped. */ public boolean enterIfNotClosedNoLock() { - if (ctx.deploymentEnabled()) - ctx.deploy().onEnter(); + onEnter(); return !stopped; } /** - * Leave a cache call entered by {@link #enter()} method. + * Leave a cache call entered by {@link #enterNoLock} method. */ public void leaveNoLock() { ctx.tm().resetContext(); @@ -125,6 +124,22 @@ public class GridCacheGateway<K, V> { * @return Previous projection set on this thread. */ @Nullable public GridCacheProjectionImpl<K, V> enter(@Nullable GridCacheProjectionImpl<K, V> prj) { + try { + GridCacheAdapter<K, V> cache = ctx.cache(); + + GridCachePreloader<K, V> preldr = cache != null ? cache.preloader() : null; + + if (preldr == null) + throw new IllegalStateException("Grid is in invalid state to perform this operation. " + + "It either not started yet or has already being or have stopped [gridName=" + ctx.gridName() + ']'); + + preldr.startFuture().get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for cache preloader start [cacheName=" + + ctx.name() + "]", e); + } + onEnter(); rwLock.readLock(); @@ -132,7 +147,7 @@ public class GridCacheGateway<K, V> { if (stopped) { rwLock.readUnlock(); - throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); + throw new IllegalStateException("Cache has been stopped: " + ctx.name()); } // Must unlock in case of unexpected errors to avoid @@ -155,38 +170,12 @@ public class GridCacheGateway<K, V> { onEnter(); if (stopped) - throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); + throw new IllegalStateException("Cache has been stopped: " + ctx.name()); return setProjectionPerCall(prj); } /** - * On enter. - */ - private void onEnter() { - try { - ctx.itHolder().checkWeakQueue(); - - GridCacheAdapter<K, V> cache = ctx.cache(); - - GridCachePreloader<K, V> preldr = cache != null ? cache.preloader() : null; - - if (preldr == null) - throw new IllegalStateException("Grid is in invalid state to perform this operation. " + - "It either not started yet or has already being or have stopped [gridName=" + ctx.gridName() + ']'); - - preldr.startFuture().get(); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to wait for cache preloader start [cacheName=" + - ctx.name() + "]", e); - } - - if (ctx.deploymentEnabled()) - ctx.deploy().onEnter(); - } - - /** * Set thread local projection per call. * * @param prj Projection to guard. @@ -230,6 +219,16 @@ public class GridCacheGateway<K, V> { /** * */ + private void onEnter() { + ctx.itHolder().checkWeakQueue(); + + if (ctx.deploymentEnabled()) + ctx.deploy().onEnter(); + } + + /** + * + */ public void block() { stopped = true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23a41dfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index d7ef8ba..c1a2d6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -81,7 +81,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @GridToStringExclude private CacheManager cacheMgr; - /** */ + /** If {@code false} does not acquire read lock on gateway enter. */ @GridToStringExclude private boolean lock; @@ -104,7 +104,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Nullable GridCacheProjectionImpl<K, V> prj, boolean async ) { - this(ctx, delegate, prj, async, false); + this(ctx, delegate, prj, async, true); } /** @@ -112,12 +112,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @param delegate Delegate. * @param prj Projection. * @param async Async support flag. + * @param lock If {@code false} does not acquire read lock on gateway enter. */ - public IgniteCacheProxy( + private IgniteCacheProxy( GridCacheContext<K, V> ctx, GridCacheProjectionEx<K, V> delegate, @Nullable GridCacheProjectionImpl<K, V> prj, - boolean async, boolean lock + boolean async, + boolean lock ) { super(async); @@ -136,10 +138,13 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** + * Gets cache proxy which does not acquire read lock on gateway enter, should be + * used only if grid read lock is externally acquired. + * * @return Ignite cache proxy with simple gate. */ public IgniteCacheProxy<K, V> cacheNoGate() { - return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), true); + return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), false); } /** @@ -234,7 +239,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc); - return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0, isAsync()); + return new IgniteCacheProxy<>(ctx, + prj0, + (GridCacheProjectionImpl<K, V>)prj0, + isAsync(), + lock); } finally { onLeave(prev); @@ -779,8 +788,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (completionLsnr != null) { fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override - public void apply(IgniteInternalFuture<?> fut) { + @Override public void apply(IgniteInternalFuture<?> fut) { try { fut.get(); @@ -1343,7 +1351,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * + * @return Proxy delegate. */ public GridCacheProjectionEx delegate() { return delegate; @@ -1406,7 +1414,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override protected IgniteCache<K, V> createAsyncInstance() { - return new IgniteCacheProxy<>(ctx, delegate, prj, true); + return new IgniteCacheProxy<>(ctx, delegate, prj, true, lock); } /** @@ -1447,7 +1455,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx, prj0, prj0, - isAsync()); + isAsync(), + lock); } finally { onLeave(prev); @@ -1477,7 +1486,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return new IgniteCacheProxy<>(ctx, prj0, prj0, - isAsync()); + isAsync(), + lock); } finally { onLeave(prev); @@ -1507,38 +1517,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return legacyProxy; } - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(ctx); - - out.writeObject(delegate); - - out.writeObject(prj); - - out.writeBoolean(lock); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ctx = (GridCacheContext<K, V>)in.readObject(); - - delegate = (GridCacheProjectionEx<K, V>)in.readObject(); - - prj = (GridCacheProjectionImpl<K, V>)in.readObject(); - - gate = ctx.gate(); - - lock = in.readBoolean(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> rebalance() { - ctx.preloader().forcePreload(); - - return new IgniteFutureImpl<>(ctx.preloader().syncFuture()); - } - /** * @param prj Projection to guard. * @return Previous projection set on this thread. @@ -1552,6 +1530,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** * On enter. + * + * @return {@code True} if enter successful. */ private boolean onEnterIfNoClose() { if (lock) @@ -1581,52 +1561,39 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteCacheProxy.class, this); - } + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx); - /** - * Closeable iterator. - */ - private abstract static class ClIter<X, Y> extends GridCloseableIteratorAdapter<Y> { - /** */ - private X cur; + out.writeObject(delegate); - /** */ - private CacheQueryFuture<X> fut; + out.writeObject(prj); - /** - * @param fut Future. - */ - protected ClIter(CacheQueryFuture<X> fut) { - this.fut = fut; - } + out.writeBoolean(lock); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ctx = (GridCacheContext<K, V>)in.readObject(); - /** {@inheritDoc} */ - @Override protected Y onNext() throws IgniteCheckedException { - if (!onHasNext()) - throw new NoSuchElementException(); + delegate = (GridCacheProjectionEx<K, V>)in.readObject(); - X e = cur; + prj = (GridCacheProjectionImpl<K, V>)in.readObject(); - cur = null; + gate = ctx.gate(); - return convert(e); - } + lock = in.readBoolean(); + } - /** - * @param x X. - */ - protected abstract Y convert(X x); + /** {@inheritDoc} */ + @Override public IgniteFuture<?> rebalance() { + ctx.preloader().forcePreload(); - /** {@inheritDoc} */ - @Override protected boolean onHasNext() throws IgniteCheckedException { - return cur != null || (cur = fut.next()) != null; - } + return new IgniteFutureImpl<>(ctx.preloader().syncFuture()); + } - /** {@inheritDoc} */ - @Override protected void onClose() throws IgniteCheckedException { - fut.cancel(); - } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteCacheProxy.class, this); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23a41dfd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java index 52471cd..21ba3ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -88,18 +88,6 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> { if (log.isDebugEnabled()) log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']'); -// TODO IGNITE-77: restore adapter usage. -// TODO use cacheContext.awaitStarted() instead of preloader().startFuture().get() -// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); -// -// IgniteFuture<?> f = cache.context().preloader().startFuture(); -// -// if (!f.isDone()) -// f.get(); -// -// if (ignoreDepOwnership) -// cache.context().deploy().ignoreOwnership(true); - IgniteCacheProxy cache = ctx.cache().jcache(cacheName).cacheNoGate(); cache.context().awaitStarted();