Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-758
Conflicts:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/960b0a3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/960b0a3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/960b0a3e
Branch: refs/heads/ignite-646
Commit: 960b0a3ee91bdeb92696538b38bd51a8440ed924
Parents: 3b5fa57 357a715
Author: ivasilinets <[email protected]>
Authored: Fri Apr 17 14:30:19 2015 +0300
Committer: ivasilinets <[email protected]>
Committed: Fri Apr 17 14:30:19 2015 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 28 ++
.../processors/cache/GridCacheAtomicFuture.java | 7 -
.../processors/cache/GridCacheGateway.java | 111 +++++--
.../processors/cache/IgniteCacheProxy.java | 295 +++++++++--------
.../dht/atomic/GridDhtAtomicCache.java | 51 +--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 19 --
.../dht/atomic/GridNearAtomicUpdateFuture.java | 14 -
.../datastreamer/DataStreamerCacheUpdaters.java | 2 +-
.../datastreamer/DataStreamerImpl.java | 81 +++--
.../datastreamer/DataStreamerUpdateJob.java | 16 +-
.../dr/IgniteDrDataStreamerCacheUpdater.java | 2 -
.../internal/GridLifecycleBeanSelfTest.java | 36 +++
.../GridCacheAtomicTimeoutSelfTest.java | 314 -------------------
.../IgniteCacheAtomicMessageRecoveryTest.java | 32 ++
.../IgniteCacheMessageRecoveryAbstractTest.java | 175 +++++++++++
.../IgniteCacheTxMessageRecoveryTest.java | 32 ++
...eAtomicInvalidPartitionHandlingSelfTest.java | 14 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 3 +-
.../query/h2/sql/GridSqlFunction.java | 6 +-
.../query/h2/sql/GridSqlPlaceholder.java | 51 +++
.../query/h2/sql/GridSqlQueryParser.java | 6 +-
.../h2/sql/AbstractH2CompareQueryTest.java | 49 ++-
.../query/h2/sql/GridQueryParsingTest.java | 9 +
.../query/h2/sql/H2CompareBigQueryTest.java | 2 +-
.../processors/query/h2/sql/bigQuery.sql | 2 +-
25 files changed, 711 insertions(+), 646 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index b654afd,aa73414..04c721a
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@@ -106,10 -123,8 +123,8 @@@ public class GridCacheGateway<K, V>
* @param prj Projection to guard.
* @return Previous projection set on this thread.
*/
- @Nullable public GridCacheProjectionImpl<K, V> enter(@Nullable
GridCacheProjectionImpl<K, V> prj) {
+ @Nullable public CacheOperationContext enter(@Nullable
CacheOperationContext prj) {
try {
- ctx.itHolder().checkWeakQueue();
-
GridCacheAdapter<K, V> cache = ctx.cache();
GridCachePreloader<K, V> preldr = cache != null ?
cache.preloader() : null;
@@@ -155,18 -163,39 +163,39 @@@
}
/**
+ * @param prj Projection to guard.
+ * @return Previous projection set on this thread.
+ */
+ @Nullable public GridCacheProjectionImpl<K, V> enterNoLock(@Nullable
GridCacheProjectionImpl<K, V> prj) {
+ onEnter();
+
+ if (stopped)
+ throw new IllegalStateException("Cache has been stopped: " +
ctx.name());
+
+ return setProjectionPerCall(prj);
+ }
+
+ /**
+ * Set thread local projection per call.
+ *
+ * @param prj Projection to guard.
+ * @return Previous projection set on this thread.
+ */
- private GridCacheProjectionImpl<K, V> setProjectionPerCall(@Nullable
GridCacheProjectionImpl<K, V> prj) {
- GridCacheProjectionImpl<K, V> prev = ctx.projectionPerCall();
++ private CacheOperationContext setProjectionPerCall(@Nullable
CacheOperationContext prj) {
++ CacheOperationContext prev = ctx.operationContextPerCall();
+
+ if (prev != null || prj != null)
- ctx.projectionPerCall(prj);
++ ctx.operationContextPerCall(prj);
+
+ return prev;
+ }
+
+ /**
* @param prev Previous.
*/
- public void leave(GridCacheProjectionImpl<K, V> prev) {
+ public void leave(CacheOperationContext prev) {
try {
- ctx.tm().resetContext();
- ctx.mvcc().contextReset();
-
- // Unwind eviction notifications.
- CU.unwindEvicts(ctx);
-
- // Return back previous thread local operation context per call.
- ctx.operationContextPerCall(prev);
+ leaveNoLock(prev);
}
finally {
rwLock.readUnlock();
@@@ -174,6 -203,30 +203,30 @@@
}
/**
+ * @param prev Previous.
+ */
- public void leaveNoLock(GridCacheProjectionImpl<K, V> prev) {
++ public void leaveNoLock(CacheOperationContext prev) {
+ ctx.tm().resetContext();
+ ctx.mvcc().contextReset();
+
+ // Unwind eviction notifications.
+ CU.unwindEvicts(ctx);
+
+ // Return back previous thread local projection per call.
- ctx.projectionPerCall(prev);
++ ctx.operationContextPerCall(prev);
+ }
+
+ /**
+ *
+ */
+ private void onEnter() {
+ ctx.itHolder().checkWeakQueue();
+
+ if (ctx.deploymentEnabled())
+ ctx.deploy().onEnter();
+ }
+
+ /**
*
*/
public void block() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 64d19ba,244e200..91f12b3
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@@ -97,10 -101,27 +101,27 @@@ public class IgniteCacheProxy<K, V> ext
*/
public IgniteCacheProxy(
GridCacheContext<K, V> ctx,
- GridCacheProjectionEx<K, V> delegate,
- @Nullable GridCacheProjectionImpl<K, V> prj,
+ IgniteInternalCache<K, V> delegate,
+ CacheOperationContext opCtx,
boolean async
) {
- this(ctx, delegate, prj, async, true);
++ this(ctx, delegate, opCtx, async, true);
+ }
+
+ /**
+ * @param ctx Context.
+ * @param delegate Delegate.
+ * @param prj Projection.
+ * @param async Async support flag.
+ * @param lock If {@code false} does not acquire read lock on gateway
enter.
+ */
+ private IgniteCacheProxy(
+ GridCacheContext<K, V> ctx,
- GridCacheProjectionEx<K, V> delegate,
- @Nullable GridCacheProjectionImpl<K, V> prj,
++ IgniteInternalCache<K, V> delegate,
++ @Nullable CacheOperationContext opCtx,
+ boolean async,
+ boolean lock
+ ) {
super(async);
assert ctx != null;
@@@ -112,7 -133,19 +133,19 @@@
gate = ctx.gate();
- legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, prj);
+ legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, opCtx);
+
+ this.lock = lock;
+ }
+
+ /**
+ * Gets cache proxy which does not acquire read lock on gateway enter,
should be
+ * used only if grid read lock is externally acquired.
+ *
+ * @return Ignite cache proxy with simple gate.
+ */
+ public IgniteCacheProxy<K, V> cacheNoGate() {
- return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), false);
++ return new IgniteCacheProxy<>(ctx, delegate, opCtx, isAsync(), false);
}
/**
@@@ -131,7 -164,7 +164,7 @@@
/** {@inheritDoc} */
@Override public CacheMetrics metrics() {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
return ctx.cache().metrics();
@@@ -143,7 -176,7 +176,7 @@@
/** {@inheritDoc} */
@Override public CacheMetrics metrics(ClusterGroup grp) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
@@@ -168,7 -201,7 +201,7 @@@
/** {@inheritDoc} */
@Override public CacheMetricsMXBean mxBean() {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return ctx.cache().mxBean();
@@@ -190,7 -223,7 +223,7 @@@
/** {@inheritDoc} */
@Nullable @Override public Entry<K, V> randomEntry() {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return ctx.cache().randomEntry();
@@@ -202,15 -235,19 +235,15 @@@
/** {@inheritDoc} */
@Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- CacheOperationContext prj0 = opCtx.withExpiryPolicy(plc);
+ GridCacheProjectionEx<K, V> prj0 = prj != null ?
prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc);
- return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync());
- return new IgniteCacheProxy<>(ctx,
- prj0,
- (GridCacheProjectionImpl<K, V>)prj0,
- isAsync(),
- lock);
++ return new IgniteCacheProxy<>(ctx, prj0,
(GridCacheProjectionImpl<K, V>)prj0, isAsync());
}
finally {
- onLeave(prev);
+ gate.leave(prev);
}
}
@@@ -222,7 -259,7 +255,7 @@@
/** {@inheritDoc} */
@Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p,
@Nullable Object... args) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync())
@@@ -242,7 -279,7 +275,7 @@@
/** {@inheritDoc} */
@Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p,
@Nullable Object... args) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync())
@@@ -262,7 -299,7 +295,7 @@@
/** {@inheritDoc} */
@Nullable @Override public V getAndPutIfAbsent(K key, V val) throws
CacheException {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -294,7 -331,7 +327,7 @@@
/** {@inheritDoc} */
@Override public boolean isLocalLocked(K key, boolean byCurrThread) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return byCurrThread ? delegate.isLockedByThread(key) :
delegate.isLocked(key);
@@@ -449,7 -486,7 +482,7 @@@
@Override public <R> QueryCursor<R> query(Query<R> qry) {
A.notNull(qry, "qry");
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@@ -511,7 -548,7 +544,7 @@@
/** {@inheritDoc} */
@Override public Iterable<Entry<K, V>> localEntries(CachePeekMode...
peekModes) throws CacheException {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.localEntries(peekModes);
@@@ -526,19 -563,19 +559,19 @@@
/** {@inheritDoc} */
@Override public QueryMetrics queryMetrics() {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
- return delegate.context().queries().metrics();
+ return delegate.queries().metrics();
}
finally {
- onLeave(prev);
+ gate.leave(prev);
}
}
/** {@inheritDoc} */
@Override public void localEvict(Collection<? extends K> keys) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
delegate.evictAll(keys);
@@@ -550,7 -587,7 +583,7 @@@
/** {@inheritDoc} */
@Nullable @Override public V localPeek(K key, CachePeekMode... peekModes)
{
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.localPeek(key, peekModes, null);
@@@ -566,7 -603,7 +599,7 @@@
/** {@inheritDoc} */
@Override public void localPromote(Set<? extends K> keys) throws
CacheException {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
delegate.promoteAll(keys);
@@@ -582,7 -619,7 +615,7 @@@
/** {@inheritDoc} */
@Override public int size(CachePeekMode... peekModes) throws
CacheException {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -603,7 -640,7 +636,7 @@@
/** {@inheritDoc} */
@Override public int localSize(CachePeekMode... peekModes) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.localSize(peekModes);
@@@ -619,7 -656,7 +652,7 @@@
/** {@inheritDoc} */
@Override public V get(K key) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -642,7 -679,7 +675,7 @@@
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -668,7 -705,7 +701,7 @@@
*/
public Map<K, V> getAll(Collection<? extends K> keys) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -695,7 -732,7 +728,7 @@@
* @return Entry set.
*/
public Set<Entry<K, V>> entrySetx(CacheEntryPredicate... filter) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
return delegate.entrySetx(filter);
@@@ -707,7 -744,7 +740,7 @@@
/** {@inheritDoc} */
@Override public boolean containsKey(K key) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -725,7 -762,7 +758,7 @@@
/** {@inheritDoc} */
@Override public boolean containsKeys(Set<? extends K> keys) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -747,7 -784,7 +780,7 @@@
boolean replaceExisting,
@Nullable final CompletionListener completionLsnr
) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys,
replaceExisting);
@@@ -775,7 -812,7 +808,7 @@@
/** {@inheritDoc} */
@Override public void put(K key, V val) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync())
@@@ -795,7 -832,7 +828,7 @@@
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -818,7 -855,7 +851,7 @@@
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> map) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync())
@@@ -838,7 -875,7 +871,7 @@@
/** {@inheritDoc} */
@Override public boolean putIfAbsent(K key, V val) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -861,7 -898,7 +894,7 @@@
/** {@inheritDoc} */
@Override public boolean remove(K key) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -884,7 -921,7 +917,7 @@@
/** {@inheritDoc} */
@Override public boolean remove(K key, V oldVal) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -907,7 -944,7 +940,7 @@@
/** {@inheritDoc} */
@Override public V getAndRemove(K key) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -930,7 -967,7 +963,7 @@@
/** {@inheritDoc} */
@Override public boolean replace(K key, V oldVal, V newVal) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -953,7 -990,7 +986,7 @@@
/** {@inheritDoc} */
@Override public boolean replace(K key, V val) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
if (isAsync()) {
@@@ -976,7 -1013,7 +1009,7 @@@
/** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync()) {
@@@ -999,7 -1036,7 +1032,7 @@@
/** {@inheritDoc} */
@Override public void removeAll(Set<? extends K> keys) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync())
@@@ -1018,7 -1055,7 +1051,7 @@@
/** {@inheritDoc} */
@Override public void removeAll() {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync())
@@@ -1036,7 -1073,7 +1069,7 @@@
/** {@inheritDoc} */
@Override public void clear(K key) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync())
@@@ -1054,7 -1091,7 +1087,7 @@@
/** {@inheritDoc} */
@Override public void clearAll(Set<? extends K> keys) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync())
@@@ -1072,7 -1109,7 +1105,7 @@@
/** {@inheritDoc} */
@Override public void clear() {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync())
@@@ -1090,7 -1127,7 +1123,7 @@@
/** {@inheritDoc} */
@Override public void localClear(K key) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
delegate.clearLocally(key);
@@@ -1102,7 -1139,7 +1135,7 @@@
/** {@inheritDoc} */
@Override public void localClearAll(Set<? extends K> keys) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
for (K key : keys)
@@@ -1117,7 -1154,7 +1150,7 @@@
@Override public <T> T invoke(K key, EntryProcessor<K, V, T>
entryProcessor, Object... args)
throws EntryProcessorException {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync()) {
@@@ -1155,7 -1192,7 +1188,7 @@@
@Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T>
entryProcessor, Object... args)
throws EntryProcessorException {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync()) {
@@@ -1194,7 -1231,7 +1227,7 @@@
EntryProcessor<K, V, T> entryProcessor,
Object...
args) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync()) {
@@@ -1219,7 -1256,7 +1252,7 @@@
CacheEntryProcessor<K, V, T> entryProcessor,
Object... args) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync()) {
@@@ -1244,7 -1281,7 +1277,7 @@@
Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
Object... args) {
try {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
if (isAsync()) {
@@@ -1317,9 -1354,9 +1350,9 @@@
}
/**
- *
+ * @return Proxy delegate.
*/
- public GridCacheProjectionEx delegate() {
+ public IgniteInternalCache delegate() {
return delegate;
}
@@@ -1336,7 -1375,7 +1369,7 @@@
/** {@inheritDoc} */
@Override public void
registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
@@@ -1351,7 -1390,7 +1384,7 @@@
/** {@inheritDoc} */
@Override public void
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
@@@ -1366,7 -1405,7 +1399,7 @@@
/** {@inheritDoc} */
@Override public Iterator<Cache.Entry<K, V>> iterator() {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
return ctx.cache().igniteIterator();
@@@ -1378,7 -1417,7 +1411,7 @@@
/** {@inheritDoc} */
@Override protected IgniteCache<K, V> createAsyncInstance() {
- return new IgniteCacheProxy<>(ctx, delegate, opCtx, true);
- return new IgniteCacheProxy<>(ctx, delegate, prj, true, lock);
++ return new IgniteCacheProxy<>(ctx, delegate, opCtx, true, lock);
}
/**
@@@ -1405,19 -1444,25 +1438,20 @@@
* @return Projection for portable objects.
*/
public <K1, V1> IgniteCache<K1, V1> keepPortable() {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
- GridCacheProjectionImpl<K1, V1> prj0 = new
GridCacheProjectionImpl<>(
- (CacheProjection<K1, V1>)(prj != null ? prj : delegate),
- (GridCacheContext<K1, V1>)ctx,
- prj != null ? prj.skipStore() : false,
- prj != null ? prj.subjectId() : null,
- true,
- prj != null ? prj.expiry() : null);
+ CacheOperationContext prj =
+ new CacheOperationContext(opCtx.skipStore(),
opCtx.subjectId(), true, opCtx.expiry());
return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
- prj0,
- prj0,
+ (GridCacheAdapter<K1, V1>)delegate,
+ prj,
- isAsync());
+ isAsync(),
+ lock);
}
finally {
- gate.leave(prev);
+ onLeave(prev);
}
}
@@@ -1425,24 -1470,30 +1459,25 @@@
* @return Cache with skip store enabled.
*/
public IgniteCache<K, V> skipStore() {
- CacheOperationContext prev = gate.enter(opCtx);
- GridCacheProjectionImpl<K, V> prev = onEnter(prj);
++ CacheOperationContext prev = onEnter(opCtx);
try {
- boolean skip = prj != null && prj.skipStore();
+ boolean skip = opCtx != null && opCtx.skipStore();
if (skip)
return this;
- GridCacheProjectionImpl<K, V> prj0 = new
GridCacheProjectionImpl<>(
- (prj != null ? prj : delegate),
- ctx,
- true,
- prj != null ? prj.subjectId() : null,
- prj != null && prj.isKeepPortable(),
- prj != null ? prj.expiry() : null);
+ CacheOperationContext prj0 =
+ new CacheOperationContext(true, opCtx.subjectId(),
opCtx.isKeepPortable(), opCtx.expiry());
return new IgniteCacheProxy<>(ctx,
- prj0,
+ delegate,
prj0,
- isAsync());
+ isAsync(),
+ lock);
}
finally {
- gate.leave(prev);
+ onLeave(prev);
}
}
@@@ -1469,13 -1520,58 +1504,58 @@@
return legacyProxy;
}
+ /**
- * @param prj Projection to guard.
++ * @param opCtx Cache operation context to guard.
+ * @return Previous projection set on this thread.
+ */
- private GridCacheProjectionImpl<K, V> onEnter(GridCacheProjectionImpl<K,
V> prj) {
++ private CacheOperationContext onEnter(CacheOperationContext opCtx) {
+ if (lock)
- return gate.enter(prj);
++ return gate.enter(opCtx);
+ else
- return gate.enterNoLock(prj);
++ return gate.enterNoLock(opCtx);
+ }
+
+ /**
+ * On enter.
+ *
+ * @return {@code True} if enter successful.
+ */
+ private boolean onEnterIfNoClose() {
+ if (lock)
+ return gate.enterIfNotClosed();
+ else
+ return gate.enterIfNotClosedNoLock();
+ }
+
+ /**
- * @param prj Projection to guard..
++ * @param opCtx Operation context to guard.
+ */
- private void onLeave(GridCacheProjectionImpl<K, V> prj) {
++ private void onLeave(CacheOperationContext opCtx) {
+ if (lock)
- gate.leave(prj);
++ gate.leave(opCtx);
+ else
- gate.leaveNoLock(prj);
++ gate.leaveNoLock(opCtx);
+ }
+
+ /**
+ * On leave.
+ */
+ private void onLeave() {
+ if (lock)
+ gate.leave();
+ else
+ gate.leaveNoLock();
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx);
out.writeObject(delegate);
- out.writeObject(prj);
+ out.writeObject(opCtx);
+
+ out.writeBoolean(lock);
}
/** {@inheritDoc} */
@@@ -1483,11 -1579,13 +1563,13 @@@
@Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
ctx = (GridCacheContext<K, V>)in.readObject();
- delegate = (GridCacheProjectionEx<K, V>)in.readObject();
+ delegate = (IgniteInternalCache<K, V>)in.readObject();
- prj = (GridCacheProjectionImpl<K, V>)in.readObject();
+ opCtx = (CacheOperationContext)in.readObject();
gate = ctx.gate();
+
+ lock = in.readBoolean();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
----------------------------------------------------------------------