# ignite-743

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/23a41dfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/23a41dfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/23a41dfd

Branch: refs/heads/ignite-gg-9613
Commit: 23a41dfd2b7904c72a9fdebe35ef00b02ad62b3a
Parents: d6434bb
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Apr 16 10:51:10 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Apr 16 11:03:10 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheGateway.java      |  67 +++++-----
 .../processors/cache/IgniteCacheProxy.java      | 129 +++++++------------
 .../datastreamer/DataStreamerUpdateJob.java     |  12 --
 3 files changed, 81 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23a41dfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 97fada9..aa73414 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -66,10 +66,10 @@ public class GridCacheGateway<K, V> {
     /**
      * Enter a cache call.
      *
-     * @return {@code true} if enter successful, {@code false} if the cache or 
the node was stopped.
+     * @return {@code True} if enter successful, {@code false} if the cache or 
the node was stopped.
      */
     public boolean enterIfNotClosed() {
-        enterIfNotClosedNoLock();
+        onEnter();
 
         // Must unlock in case of unexpected errors to avoid
         // deadlocks during kernal stop.
@@ -87,17 +87,16 @@ public class GridCacheGateway<K, V> {
     /**
      * Enter a cache call without lock.
      *
-     * @return {@code true} if enter successful, {@code false} if the cache or 
the node was stopped.
+     * @return {@code True} if enter successful, {@code false} if the cache or 
the node was stopped.
      */
     public boolean enterIfNotClosedNoLock() {
-        if (ctx.deploymentEnabled())
-            ctx.deploy().onEnter();
+        onEnter();
 
         return !stopped;
     }
 
     /**
-     * Leave a cache call entered by {@link #enter()} method.
+     * Leave a cache call entered by {@link #enterNoLock} method.
      */
     public void leaveNoLock() {
         ctx.tm().resetContext();
@@ -125,6 +124,22 @@ public class GridCacheGateway<K, V> {
      * @return Previous projection set on this thread.
      */
     @Nullable public GridCacheProjectionImpl<K, V> enter(@Nullable 
GridCacheProjectionImpl<K, V> prj) {
+        try {
+            GridCacheAdapter<K, V> cache = ctx.cache();
+
+            GridCachePreloader<K, V> preldr = cache != null ? 
cache.preloader() : null;
+
+            if (preldr == null)
+                throw new IllegalStateException("Grid is in invalid state to 
perform this operation. " +
+                    "It either not started yet or has already being or have 
stopped [gridName=" + ctx.gridName() + ']');
+
+            preldr.startFuture().get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to wait for cache preloader 
start [cacheName=" +
+                ctx.name() + "]", e);
+        }
+
         onEnter();
 
         rwLock.readLock();
@@ -132,7 +147,7 @@ public class GridCacheGateway<K, V> {
         if (stopped) {
             rwLock.readUnlock();
 
-            throw new IllegalStateException("Dynamic cache has been stopped: " 
+ ctx.name());
+            throw new IllegalStateException("Cache has been stopped: " + 
ctx.name());
         }
 
         // Must unlock in case of unexpected errors to avoid
@@ -155,38 +170,12 @@ public class GridCacheGateway<K, V> {
         onEnter();
 
         if (stopped)
-            throw new IllegalStateException("Dynamic cache has been stopped: " 
+ ctx.name());
+            throw new IllegalStateException("Cache has been stopped: " + 
ctx.name());
 
         return setProjectionPerCall(prj);
     }
 
     /**
-     * On enter.
-     */
-    private void onEnter() {
-        try {
-            ctx.itHolder().checkWeakQueue();
-
-            GridCacheAdapter<K, V> cache = ctx.cache();
-
-            GridCachePreloader<K, V> preldr = cache != null ? 
cache.preloader() : null;
-
-            if (preldr == null)
-                throw new IllegalStateException("Grid is in invalid state to 
perform this operation. " +
-                    "It either not started yet or has already being or have 
stopped [gridName=" + ctx.gridName() + ']');
-
-            preldr.startFuture().get();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException("Failed to wait for cache preloader 
start [cacheName=" +
-                ctx.name() + "]", e);
-        }
-
-        if (ctx.deploymentEnabled())
-            ctx.deploy().onEnter();
-    }
-
-    /**
      * Set thread local projection per call.
      *
      * @param prj Projection to guard.
@@ -230,6 +219,16 @@ public class GridCacheGateway<K, V> {
     /**
      *
      */
+    private void onEnter() {
+        ctx.itHolder().checkWeakQueue();
+
+        if (ctx.deploymentEnabled())
+            ctx.deploy().onEnter();
+    }
+
+    /**
+     *
+     */
     public void block() {
         stopped = true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23a41dfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index d7ef8ba..c1a2d6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -81,7 +81,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     @GridToStringExclude
     private CacheManager cacheMgr;
 
-    /** */
+    /** If {@code false} does not acquire read lock on gateway enter. */
     @GridToStringExclude
     private boolean lock;
 
@@ -104,7 +104,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         @Nullable GridCacheProjectionImpl<K, V> prj,
         boolean async
     ) {
-        this(ctx, delegate, prj, async, false);
+        this(ctx, delegate, prj, async, true);
     }
 
     /**
@@ -112,12 +112,14 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * @param delegate Delegate.
      * @param prj Projection.
      * @param async Async support flag.
+     * @param lock If {@code false} does not acquire read lock on gateway 
enter.
      */
-    public IgniteCacheProxy(
+    private IgniteCacheProxy(
         GridCacheContext<K, V> ctx,
         GridCacheProjectionEx<K, V> delegate,
         @Nullable GridCacheProjectionImpl<K, V> prj,
-        boolean async, boolean lock
+        boolean async,
+        boolean lock
     ) {
         super(async);
 
@@ -136,10 +138,13 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /**
+     * Gets cache proxy which does not acquire read lock on gateway enter, 
should be
+     * used only if grid read lock is externally acquired.
+     *
      * @return Ignite cache proxy with simple gate.
      */
     public IgniteCacheProxy<K, V> cacheNoGate() {
-        return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), true);
+        return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), false);
     }
 
     /**
@@ -234,7 +239,11 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         try {
             GridCacheProjectionEx<K, V> prj0 = prj != null ? 
prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc);
 
-            return new IgniteCacheProxy<>(ctx, prj0, 
(GridCacheProjectionImpl<K, V>)prj0, isAsync());
+            return new IgniteCacheProxy<>(ctx,
+                prj0,
+                (GridCacheProjectionImpl<K, V>)prj0,
+                isAsync(),
+                lock);
         }
         finally {
             onLeave(prev);
@@ -779,8 +788,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
 
             if (completionLsnr != null) {
                 fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                    @Override
-                    public void apply(IgniteInternalFuture<?> fut) {
+                    @Override public void apply(IgniteInternalFuture<?> fut) {
                         try {
                             fut.get();
 
@@ -1343,7 +1351,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /**
-     *
+     * @return Proxy delegate.
      */
     public GridCacheProjectionEx delegate() {
         return delegate;
@@ -1406,7 +1414,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
 
     /** {@inheritDoc} */
     @Override protected IgniteCache<K, V> createAsyncInstance() {
-        return new IgniteCacheProxy<>(ctx, delegate, prj, true);
+        return new IgniteCacheProxy<>(ctx, delegate, prj, true, lock);
     }
 
     /**
@@ -1447,7 +1455,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
             return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
                 prj0,
                 prj0,
-                isAsync());
+                isAsync(),
+                lock);
         }
         finally {
             onLeave(prev);
@@ -1477,7 +1486,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
             return new IgniteCacheProxy<>(ctx,
                 prj0,
                 prj0,
-                isAsync());
+                isAsync(),
+                lock);
         }
         finally {
             onLeave(prev);
@@ -1507,38 +1517,6 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         return legacyProxy;
     }
 
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(ctx);
-
-        out.writeObject(delegate);
-
-        out.writeObject(prj);
-
-        out.writeBoolean(lock);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"unchecked"})
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        ctx = (GridCacheContext<K, V>)in.readObject();
-
-        delegate = (GridCacheProjectionEx<K, V>)in.readObject();
-
-        prj = (GridCacheProjectionImpl<K, V>)in.readObject();
-
-        gate = ctx.gate();
-
-        lock = in.readBoolean();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> rebalance() {
-        ctx.preloader().forcePreload();
-
-        return new IgniteFutureImpl<>(ctx.preloader().syncFuture());
-    }
-
     /**
      * @param prj Projection to guard.
      * @return Previous projection set on this thread.
@@ -1552,6 +1530,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
 
     /**
      * On enter.
+     *
+     * @return {@code True} if enter successful.
      */
     private boolean onEnterIfNoClose() {
         if (lock)
@@ -1581,52 +1561,39 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteCacheProxy.class, this);
-    }
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx);
 
-    /**
-     * Closeable iterator.
-     */
-    private abstract static class ClIter<X, Y> extends 
GridCloseableIteratorAdapter<Y> {
-        /** */
-        private X cur;
+        out.writeObject(delegate);
 
-        /** */
-        private CacheQueryFuture<X> fut;
+        out.writeObject(prj);
 
-        /**
-         * @param fut Future.
-         */
-        protected ClIter(CacheQueryFuture<X> fut) {
-            this.fut = fut;
-        }
+        out.writeBoolean(lock);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked"})
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        ctx = (GridCacheContext<K, V>)in.readObject();
 
-        /** {@inheritDoc} */
-        @Override protected Y onNext() throws IgniteCheckedException {
-            if (!onHasNext())
-                throw new NoSuchElementException();
+        delegate = (GridCacheProjectionEx<K, V>)in.readObject();
 
-            X e = cur;
+        prj = (GridCacheProjectionImpl<K, V>)in.readObject();
 
-            cur = null;
+        gate = ctx.gate();
 
-            return convert(e);
-        }
+        lock = in.readBoolean();
+    }
 
-        /**
-         * @param x X.
-         */
-        protected abstract Y convert(X x);
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> rebalance() {
+        ctx.preloader().forcePreload();
 
-        /** {@inheritDoc} */
-        @Override protected boolean onHasNext() throws IgniteCheckedException {
-            return cur != null || (cur = fut.next()) != null;
-        }
+        return new IgniteFutureImpl<>(ctx.preloader().syncFuture());
+    }
 
-        /** {@inheritDoc} */
-        @Override protected void onClose() throws IgniteCheckedException {
-            fut.cancel();
-        }
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteCacheProxy.class, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23a41dfd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
index 52471cd..21ba3ac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
@@ -88,18 +88,6 @@ class DataStreamerUpdateJob implements 
GridPlainCallable<Object> {
         if (log.isDebugEnabled())
             log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", 
size=" + col.size() + ']');
 
-//        TODO IGNITE-77: restore adapter usage.
-//        TODO use cacheContext.awaitStarted() instead of 
preloader().startFuture().get()
-//        GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
-//
-//        IgniteFuture<?> f = cache.context().preloader().startFuture();
-//
-//        if (!f.isDone())
-//            f.get();
-//
-//        if (ignoreDepOwnership)
-//            cache.context().deploy().ignoreOwnership(true);
-
         IgniteCacheProxy cache = ctx.cache().jcache(cacheName).cacheNoGate();
 
         cache.context().awaitStarted();

Reply via email to