Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-758

Conflicts:
        
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
        
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/960b0a3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/960b0a3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/960b0a3e

Branch: refs/heads/ignite-709
Commit: 960b0a3ee91bdeb92696538b38bd51a8440ed924
Parents: 3b5fa57 357a715
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Fri Apr 17 14:30:19 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Fri Apr 17 14:30:19 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  28 ++
 .../processors/cache/GridCacheAtomicFuture.java |   7 -
 .../processors/cache/GridCacheGateway.java      | 111 +++++--
 .../processors/cache/IgniteCacheProxy.java      | 295 +++++++++--------
 .../dht/atomic/GridDhtAtomicCache.java          |  51 +--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  19 --
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  14 -
 .../datastreamer/DataStreamerCacheUpdaters.java |   2 +-
 .../datastreamer/DataStreamerImpl.java          |  81 +++--
 .../datastreamer/DataStreamerUpdateJob.java     |  16 +-
 .../dr/IgniteDrDataStreamerCacheUpdater.java    |   2 -
 .../internal/GridLifecycleBeanSelfTest.java     |  36 +++
 .../GridCacheAtomicTimeoutSelfTest.java         | 314 -------------------
 .../IgniteCacheAtomicMessageRecoveryTest.java   |  32 ++
 .../IgniteCacheMessageRecoveryAbstractTest.java | 175 +++++++++++
 .../IgniteCacheTxMessageRecoveryTest.java       |  32 ++
 ...eAtomicInvalidPartitionHandlingSelfTest.java |  14 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 +-
 .../query/h2/sql/GridSqlFunction.java           |   6 +-
 .../query/h2/sql/GridSqlPlaceholder.java        |  51 +++
 .../query/h2/sql/GridSqlQueryParser.java        |   6 +-
 .../h2/sql/AbstractH2CompareQueryTest.java      |  49 ++-
 .../query/h2/sql/GridQueryParsingTest.java      |   9 +
 .../query/h2/sql/H2CompareBigQueryTest.java     |   2 +-
 .../processors/query/h2/sql/bigQuery.sql        |   2 +-
 25 files changed, 711 insertions(+), 646 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index b654afd,aa73414..04c721a
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@@ -106,10 -123,8 +123,8 @@@ public class GridCacheGateway<K, V> 
       * @param prj Projection to guard.
       * @return Previous projection set on this thread.
       */
 -    @Nullable public GridCacheProjectionImpl<K, V> enter(@Nullable 
GridCacheProjectionImpl<K, V> prj) {
 +    @Nullable public CacheOperationContext enter(@Nullable 
CacheOperationContext prj) {
          try {
-             ctx.itHolder().checkWeakQueue();
- 
              GridCacheAdapter<K, V> cache = ctx.cache();
  
              GridCachePreloader<K, V> preldr = cache != null ? 
cache.preloader() : null;
@@@ -155,18 -163,39 +163,39 @@@
      }
  
      /**
+      * @param prj Projection to guard.
+      * @return Previous projection set on this thread.
+      */
+     @Nullable public GridCacheProjectionImpl<K, V> enterNoLock(@Nullable 
GridCacheProjectionImpl<K, V> prj) {
+         onEnter();
+ 
+         if (stopped)
+             throw new IllegalStateException("Cache has been stopped: " + 
ctx.name());
+ 
+         return setProjectionPerCall(prj);
+     }
+ 
+     /**
+      * Set thread local projection per call.
+      *
+      * @param prj Projection to guard.
+      * @return Previous projection set on this thread.
+      */
 -    private GridCacheProjectionImpl<K, V> setProjectionPerCall(@Nullable 
GridCacheProjectionImpl<K, V> prj) {
 -        GridCacheProjectionImpl<K, V> prev = ctx.projectionPerCall();
++    private CacheOperationContext setProjectionPerCall(@Nullable 
CacheOperationContext prj) {
++        CacheOperationContext prev = ctx.operationContextPerCall();
+ 
+         if (prev != null || prj != null)
 -            ctx.projectionPerCall(prj);
++            ctx.operationContextPerCall(prj);
+ 
+         return prev;
+     }
+ 
+     /**
       * @param prev Previous.
       */
 -    public void leave(GridCacheProjectionImpl<K, V> prev) {
 +    public void leave(CacheOperationContext prev) {
          try {
-             ctx.tm().resetContext();
-             ctx.mvcc().contextReset();
- 
-             // Unwind eviction notifications.
-             CU.unwindEvicts(ctx);
- 
-             // Return back previous thread local operation context per call.
-             ctx.operationContextPerCall(prev);
+             leaveNoLock(prev);
          }
          finally {
              rwLock.readUnlock();
@@@ -174,6 -203,30 +203,30 @@@
      }
  
      /**
+      * @param prev Previous.
+      */
 -    public void leaveNoLock(GridCacheProjectionImpl<K, V> prev) {
++    public void leaveNoLock(CacheOperationContext prev) {
+         ctx.tm().resetContext();
+         ctx.mvcc().contextReset();
+ 
+         // Unwind eviction notifications.
+         CU.unwindEvicts(ctx);
+ 
+         // Return back previous thread local projection per call.
 -        ctx.projectionPerCall(prev);
++        ctx.operationContextPerCall(prev);
+     }
+ 
+     /**
+      *
+      */
+     private void onEnter() {
+         ctx.itHolder().checkWeakQueue();
+ 
+         if (ctx.deploymentEnabled())
+             ctx.deploy().onEnter();
+     }
+ 
+     /**
       *
       */
      public void block() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 64d19ba,244e200..91f12b3
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@@ -97,10 -101,27 +101,27 @@@ public class IgniteCacheProxy<K, V> ext
       */
      public IgniteCacheProxy(
          GridCacheContext<K, V> ctx,
 -        GridCacheProjectionEx<K, V> delegate,
 -        @Nullable GridCacheProjectionImpl<K, V> prj,
 +        IgniteInternalCache<K, V> delegate,
 +        CacheOperationContext opCtx,
          boolean async
      ) {
 -        this(ctx, delegate, prj, async, true);
++        this(ctx, delegate, opCtx, async, true);
+     }
+ 
+     /**
+      * @param ctx Context.
+      * @param delegate Delegate.
+      * @param prj Projection.
+      * @param async Async support flag.
+      * @param lock If {@code false} does not acquire read lock on gateway 
enter.
+      */
+     private IgniteCacheProxy(
+         GridCacheContext<K, V> ctx,
 -        GridCacheProjectionEx<K, V> delegate,
 -        @Nullable GridCacheProjectionImpl<K, V> prj,
++        IgniteInternalCache<K, V> delegate,
++        @Nullable CacheOperationContext opCtx,
+         boolean async,
+         boolean lock
+     ) {
          super(async);
  
          assert ctx != null;
@@@ -112,7 -133,19 +133,19 @@@
  
          gate = ctx.gate();
  
 -        legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, prj);
 +        legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, opCtx);
+ 
+         this.lock = lock;
+     }
+ 
+     /**
+      * Gets cache proxy which does not acquire read lock on gateway enter, 
should be
+      * used only if grid read lock is externally acquired.
+      *
+      * @return Ignite cache proxy with simple gate.
+      */
+     public IgniteCacheProxy<K, V> cacheNoGate() {
 -        return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), false);
++        return new IgniteCacheProxy<>(ctx, delegate, opCtx, isAsync(), false);
      }
  
      /**
@@@ -131,7 -164,7 +164,7 @@@
  
      /** {@inheritDoc} */
      @Override public CacheMetrics metrics() {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              return ctx.cache().metrics();
@@@ -143,7 -176,7 +176,7 @@@
  
      /** {@inheritDoc} */
      @Override public CacheMetrics metrics(ClusterGroup grp) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
@@@ -168,7 -201,7 +201,7 @@@
  
      /** {@inheritDoc} */
      @Override public CacheMetricsMXBean mxBean() {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              return ctx.cache().mxBean();
@@@ -190,7 -223,7 +223,7 @@@
  
      /** {@inheritDoc} */
      @Nullable @Override public Entry<K, V> randomEntry() {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              return ctx.cache().randomEntry();
@@@ -202,15 -235,19 +235,15 @@@
  
      /** {@inheritDoc} */
      @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
-             CacheOperationContext prj0 = opCtx.withExpiryPolicy(plc);
+             GridCacheProjectionEx<K, V> prj0 = prj != null ? 
prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc);
  
-             return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync());
 -            return new IgniteCacheProxy<>(ctx,
 -                prj0,
 -                (GridCacheProjectionImpl<K, V>)prj0,
 -                isAsync(),
 -                lock);
++            return new IgniteCacheProxy<>(ctx, prj0, 
(GridCacheProjectionImpl<K, V>)prj0, isAsync());
          }
          finally {
 -            onLeave(prev);
 +            gate.leave(prev);
          }
      }
  
@@@ -222,7 -259,7 +255,7 @@@
      /** {@inheritDoc} */
      @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, 
@Nullable Object... args) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync())
@@@ -242,7 -279,7 +275,7 @@@
      /** {@inheritDoc} */
      @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, 
@Nullable Object... args) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync())
@@@ -262,7 -299,7 +295,7 @@@
      /** {@inheritDoc} */
      @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws 
CacheException {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -294,7 -331,7 +327,7 @@@
  
      /** {@inheritDoc} */
      @Override public boolean isLocalLocked(K key, boolean byCurrThread) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              return byCurrThread ? delegate.isLockedByThread(key) : 
delegate.isLocked(key);
@@@ -449,7 -486,7 +482,7 @@@
      @Override public <R> QueryCursor<R> query(Query<R> qry) {
          A.notNull(qry, "qry");
  
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@@ -511,7 -548,7 +544,7 @@@
  
      /** {@inheritDoc} */
      @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... 
peekModes) throws CacheException {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              return delegate.localEntries(peekModes);
@@@ -526,19 -563,19 +559,19 @@@
  
      /** {@inheritDoc} */
      @Override public QueryMetrics queryMetrics() {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
-             return delegate.context().queries().metrics();
+             return delegate.queries().metrics();
          }
          finally {
 -            onLeave(prev);
 +            gate.leave(prev);
          }
      }
  
      /** {@inheritDoc} */
      @Override public void localEvict(Collection<? extends K> keys) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              delegate.evictAll(keys);
@@@ -550,7 -587,7 +583,7 @@@
  
      /** {@inheritDoc} */
      @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) 
{
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              return delegate.localPeek(key, peekModes, null);
@@@ -566,7 -603,7 +599,7 @@@
      /** {@inheritDoc} */
      @Override public void localPromote(Set<? extends K> keys) throws 
CacheException {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  delegate.promoteAll(keys);
@@@ -582,7 -619,7 +615,7 @@@
  
      /** {@inheritDoc} */
      @Override public int size(CachePeekMode... peekModes) throws 
CacheException {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              if (isAsync()) {
@@@ -603,7 -640,7 +636,7 @@@
  
      /** {@inheritDoc} */
      @Override public int localSize(CachePeekMode... peekModes) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              return delegate.localSize(peekModes);
@@@ -619,7 -656,7 +652,7 @@@
      /** {@inheritDoc} */
      @Override public V get(K key) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -642,7 -679,7 +675,7 @@@
      /** {@inheritDoc} */
      @Override public Map<K, V> getAll(Set<? extends K> keys) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -668,7 -705,7 +701,7 @@@
       */
      public Map<K, V> getAll(Collection<? extends K> keys) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -695,7 -732,7 +728,7 @@@
       * @return Entry set.
       */
      public Set<Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              return delegate.entrySetx(filter);
@@@ -707,7 -744,7 +740,7 @@@
  
      /** {@inheritDoc} */
      @Override public boolean containsKey(K key) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              if (isAsync()) {
@@@ -725,7 -762,7 +758,7 @@@
  
      /** {@inheritDoc} */
      @Override public boolean containsKeys(Set<? extends K> keys) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              if (isAsync()) {
@@@ -747,7 -784,7 +780,7 @@@
          boolean replaceExisting,
          @Nullable final CompletionListener completionLsnr
      ) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
          try {
              IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, 
replaceExisting);
@@@ -775,7 -812,7 +808,7 @@@
      /** {@inheritDoc} */
      @Override public void put(K key, V val) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync())
@@@ -795,7 -832,7 +828,7 @@@
      /** {@inheritDoc} */
      @Override public V getAndPut(K key, V val) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -818,7 -855,7 +851,7 @@@
      /** {@inheritDoc} */
      @Override public void putAll(Map<? extends K, ? extends V> map) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync())
@@@ -838,7 -875,7 +871,7 @@@
      /** {@inheritDoc} */
      @Override public boolean putIfAbsent(K key, V val) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -861,7 -898,7 +894,7 @@@
      /** {@inheritDoc} */
      @Override public boolean remove(K key) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -884,7 -921,7 +917,7 @@@
      /** {@inheritDoc} */
      @Override public boolean remove(K key, V oldVal) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -907,7 -944,7 +940,7 @@@
      /** {@inheritDoc} */
      @Override public V getAndRemove(K key) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -930,7 -967,7 +963,7 @@@
      /** {@inheritDoc} */
      @Override public boolean replace(K key, V oldVal, V newVal) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -953,7 -990,7 +986,7 @@@
      /** {@inheritDoc} */
      @Override public boolean replace(K key, V val) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
  
              try {
                  if (isAsync()) {
@@@ -976,7 -1013,7 +1009,7 @@@
      /** {@inheritDoc} */
      @Override public V getAndReplace(K key, V val) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            CacheOperationContext prev = onEnter(opCtx);
  
              try {
                  if (isAsync()) {
@@@ -999,7 -1036,7 +1032,7 @@@
      /** {@inheritDoc} */
      @Override public void removeAll(Set<? extends K> keys) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            CacheOperationContext prev = onEnter(opCtx);
  
              try {
                  if (isAsync())
@@@ -1018,7 -1055,7 +1051,7 @@@
  
      /** {@inheritDoc} */
      @Override public void removeAll() {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              if (isAsync())
@@@ -1036,7 -1073,7 +1069,7 @@@
  
      /** {@inheritDoc} */
      @Override public void clear(K key) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              if (isAsync())
@@@ -1054,7 -1091,7 +1087,7 @@@
  
      /** {@inheritDoc} */
      @Override public void clearAll(Set<? extends K> keys) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              if (isAsync())
@@@ -1072,7 -1109,7 +1105,7 @@@
  
      /** {@inheritDoc} */
      @Override public void clear() {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              if (isAsync())
@@@ -1090,7 -1127,7 +1123,7 @@@
  
      /** {@inheritDoc} */
      @Override public void localClear(K key) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              delegate.clearLocally(key);
@@@ -1102,7 -1139,7 +1135,7 @@@
  
      /** {@inheritDoc} */
      @Override public void localClearAll(Set<? extends K> keys) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              for (K key : keys)
@@@ -1117,7 -1154,7 +1150,7 @@@
      @Override public <T> T invoke(K key, EntryProcessor<K, V, T> 
entryProcessor, Object... args)
          throws EntryProcessorException {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            CacheOperationContext prev = onEnter(opCtx);
  
              try {
                  if (isAsync()) {
@@@ -1155,7 -1192,7 +1188,7 @@@
      @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> 
entryProcessor, Object... args)
          throws EntryProcessorException {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            CacheOperationContext prev = onEnter(opCtx);
  
              try {
                  if (isAsync()) {
@@@ -1194,7 -1231,7 +1227,7 @@@
                                                                     
EntryProcessor<K, V, T> entryProcessor,
                                                                     Object... 
args) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            CacheOperationContext prev = onEnter(opCtx);
  
              try {
                  if (isAsync()) {
@@@ -1219,7 -1256,7 +1252,7 @@@
          CacheEntryProcessor<K, V, T> entryProcessor,
          Object... args) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            CacheOperationContext prev = onEnter(opCtx);
  
              try {
                  if (isAsync()) {
@@@ -1244,7 -1281,7 +1277,7 @@@
          Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
          Object... args) {
          try {
-             CacheOperationContext prev = gate.enter(opCtx);
 -            GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++            CacheOperationContext prev = onEnter(opCtx);
  
              try {
                  if (isAsync()) {
@@@ -1317,9 -1354,9 +1350,9 @@@
      }
  
      /**
-      *
+      * @return Proxy delegate.
       */
 -    public GridCacheProjectionEx delegate() {
 +    public IgniteInternalCache delegate() {
          return delegate;
      }
  
@@@ -1336,7 -1375,7 +1369,7 @@@
  
      /** {@inheritDoc} */
      @Override public void 
registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
@@@ -1351,7 -1390,7 +1384,7 @@@
  
      /** {@inheritDoc} */
      @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
@@@ -1366,7 -1405,7 +1399,7 @@@
  
      /** {@inheritDoc} */
      @Override public Iterator<Cache.Entry<K, V>> iterator() {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
              return ctx.cache().igniteIterator();
@@@ -1378,7 -1417,7 +1411,7 @@@
  
      /** {@inheritDoc} */
      @Override protected IgniteCache<K, V> createAsyncInstance() {
-         return new IgniteCacheProxy<>(ctx, delegate, opCtx, true);
 -        return new IgniteCacheProxy<>(ctx, delegate, prj, true, lock);
++        return new IgniteCacheProxy<>(ctx, delegate, opCtx, true, lock);
      }
  
      /**
@@@ -1405,19 -1444,25 +1438,20 @@@
       * @return Projection for portable objects.
       */
      public <K1, V1> IgniteCache<K1, V1> keepPortable() {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
 -            GridCacheProjectionImpl<K1, V1> prj0 = new 
GridCacheProjectionImpl<>(
 -                (CacheProjection<K1, V1>)(prj != null ? prj : delegate),
 -                (GridCacheContext<K1, V1>)ctx,
 -                prj != null ? prj.skipStore() : false,
 -                prj != null ? prj.subjectId() : null,
 -                true,
 -                prj != null ? prj.expiry() : null);
 +            CacheOperationContext prj =
 +                new CacheOperationContext(opCtx.skipStore(), 
opCtx.subjectId(), true, opCtx.expiry());
  
              return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
 -                prj0,
 -                prj0,
 +                (GridCacheAdapter<K1, V1>)delegate,
 +                prj,
-                 isAsync());
+                 isAsync(),
+                 lock);
          }
          finally {
-             gate.leave(prev);
+             onLeave(prev);
          }
      }
  
@@@ -1425,24 -1470,30 +1459,25 @@@
       * @return Cache with skip store enabled.
       */
      public IgniteCache<K, V> skipStore() {
-         CacheOperationContext prev = gate.enter(opCtx);
 -        GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++        CacheOperationContext prev = onEnter(opCtx);
  
          try {
 -            boolean skip = prj != null && prj.skipStore();
 +            boolean skip = opCtx != null && opCtx.skipStore();
  
              if (skip)
                  return this;
  
 -            GridCacheProjectionImpl<K, V> prj0 = new 
GridCacheProjectionImpl<>(
 -                (prj != null ? prj : delegate),
 -                ctx,
 -                true,
 -                prj != null ? prj.subjectId() : null,
 -                prj != null && prj.isKeepPortable(),
 -                prj != null ? prj.expiry() : null);
 +            CacheOperationContext prj0 =
 +                new CacheOperationContext(true, opCtx.subjectId(), 
opCtx.isKeepPortable(), opCtx.expiry());
  
              return new IgniteCacheProxy<>(ctx,
 -                prj0,
 +                delegate,
                  prj0,
-                 isAsync());
+                 isAsync(),
+                 lock);
          }
          finally {
-             gate.leave(prev);
+             onLeave(prev);
          }
      }
  
@@@ -1469,13 -1520,58 +1504,58 @@@
          return legacyProxy;
      }
  
+     /**
 -     * @param prj Projection to guard.
++     * @param opCtx Cache operation context to guard.
+      * @return Previous projection set on this thread.
+      */
 -    private GridCacheProjectionImpl<K, V> onEnter(GridCacheProjectionImpl<K, 
V> prj) {
++    private CacheOperationContext onEnter(CacheOperationContext opCtx) {
+         if (lock)
 -            return gate.enter(prj);
++            return gate.enter(opCtx);
+         else
 -            return gate.enterNoLock(prj);
++            return gate.enterNoLock(opCtx);
+     }
+ 
+     /**
+      * On enter.
+      *
+      * @return {@code True} if enter successful.
+      */
+     private boolean onEnterIfNoClose() {
+         if (lock)
+             return gate.enterIfNotClosed();
+         else
+             return gate.enterIfNotClosedNoLock();
+     }
+ 
+     /**
 -     * @param prj Projection to guard..
++     * @param opCtx Operation context to guard.
+      */
 -    private void onLeave(GridCacheProjectionImpl<K, V> prj) {
++    private void onLeave(CacheOperationContext opCtx) {
+         if (lock)
 -            gate.leave(prj);
++            gate.leave(opCtx);
+         else
 -            gate.leaveNoLock(prj);
++            gate.leaveNoLock(opCtx);
+     }
+ 
+     /**
+      * On leave.
+      */
+     private void onLeave() {
+         if (lock)
+             gate.leave();
+         else
+             gate.leaveNoLock();
+     }
+ 
      /** {@inheritDoc} */
      @Override public void writeExternal(ObjectOutput out) throws IOException {
          out.writeObject(ctx);
  
          out.writeObject(delegate);
  
 -        out.writeObject(prj);
 +        out.writeObject(opCtx);
+ 
+         out.writeBoolean(lock);
      }
  
      /** {@inheritDoc} */
@@@ -1483,11 -1579,13 +1563,13 @@@
      @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
          ctx = (GridCacheContext<K, V>)in.readObject();
  
 -        delegate = (GridCacheProjectionEx<K, V>)in.readObject();
 +        delegate = (IgniteInternalCache<K, V>)in.readObject();
  
 -        prj = (GridCacheProjectionImpl<K, V>)in.readObject();
 +        opCtx = (CacheOperationContext)in.readObject();
  
          gate = ctx.gate();
+ 
+         lock = in.readBoolean();
      }
  
      /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
----------------------------------------------------------------------

Reply via email to