http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index c2a00e6,0435b92..94bac05 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@@ -81,11 -80,8 +80,8 @@@ public final class GridNearGetFuture<K private GridCacheVersion ver; /** Transaction. */ - private IgniteTxLocalEx<K, V> tx; + private IgniteTxLocalEx tx; - /** Logger. */ - private IgniteLogger log; - /** Trackable flag. */ private boolean trackable; @@@ -490,13 -494,14 +480,13 @@@ } if (v != null && !reload) { - K key0 = key; + K key0 = key.value(cctx.cacheObjectContext(), true); + V val0 = v.value(cctx.cacheObjectContext(), true); - if (cctx.portableEnabled()) { - v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); - key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); - } + val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); + key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable); - add(new GridFinishedFuture<>(cctx.kernalContext(), Collections.singletonMap(key0, val0))); - add(new GridFinishedFuture<>(Collections.singletonMap(key0, v))); ++ add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } else { if (primary == null) @@@ -682,12 -683,7 +662,10 @@@ * @param savedVers Saved entry versions. * @param topVer Topology version. */ - MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, Map<K, GridCacheVersion> savedVers, long topVer) { + MiniFuture(ClusterNode node, + LinkedHashMap<KeyCacheObject, Boolean> keys, + Map<KeyCacheObject, GridCacheVersion> savedVers, + long topVer) { - super(cctx.kernalContext()); - this.node = node; this.keys = keys; this.savedVers = savedVers;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 0ffc465,88095d7..af50959 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@@ -49,10 -48,7 +48,7 @@@ import static org.apache.ignite.events. * Cache lock future. */ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean> - implements GridCacheMvccFuture<K, V, Boolean> { + implements GridCacheMvccFuture<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@@ -92,12 -91,8 +91,8 @@@ /** Lock timeout. */ private long timeout; - /** Logger. */ - @GridToStringExclude - private IgniteLogger log; - /** Filter. */ - private IgnitePredicate<Cache.Entry<K, V>>[] filter; + private CacheEntryPredicate[] filter; /** Transaction. */ @GridToStringExclude @@@ -961,14 -950,12 +950,12 @@@ if (log.isDebugEnabled()) log.debug("Before locally locking near request: " + req); - IgniteInternalFuture<GridNearLockResponse<K, V>> fut = dht().lockAllAsync(cctx, cctx.localNode(), req, filter); + IgniteInternalFuture<GridNearLockResponse> fut = dht().lockAllAsync(cctx, cctx.localNode(), req, filter); // Add new future. - add(new GridEmbeddedFuture( - cctx.kernalContext(), - fut, + add(new GridEmbeddedFuture<>( - new C2<GridNearLockResponse<K, V>, Exception, Boolean>() { - @Override public Boolean apply(GridNearLockResponse<K, V> res, Exception e) { + new C2<GridNearLockResponse, Exception, Boolean>() { + @Override public Boolean apply(GridNearLockResponse res, Exception e) { if (CU.isLockTimeoutOrCancelled(e) || (res != null && CU.isLockTimeoutOrCancelled(res.error()))) return false; @@@ -1254,10 -1234,8 +1231,8 @@@ * @param keys Keys. * @param mappings Mappings to proceed. */ - MiniFuture(ClusterNode node, Collection<K> keys, - ConcurrentLinkedDeque8<GridNearLockMapping<K, V>> mappings) { + MiniFuture(ClusterNode node, Collection<KeyCacheObject> keys, + ConcurrentLinkedDeque8<GridNearLockMapping> mappings) { - super(cctx.kernalContext()); - this.node = node; this.keys = keys; this.mappings = mappings; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index ee80a00,ff595c0..c718a67 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@@ -109,12 -109,9 +109,12 @@@ public class GridNearTransactionalCache ctx.checkSecurity(GridSecurityPermission.CACHE_READ); if (F.isEmpty(keys)) - return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap()); + return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); - IgniteTxLocalAdapter<K, V> tx = ctx.tm().threadLocalTx(); + if (keyCheck) + validateCacheKeys(keys); + + IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(); if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index a8804dc,dd5d07d..d338352 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@@ -421,21 -408,12 +408,12 @@@ public final class GridNearTxFinishFutu /** Keys. */ @GridToStringInclude - private GridDistributedTxMapping<K, V> m; + private GridDistributedTxMapping m; /** - * Empty constructor required for {@link Externalizable}. - */ - public MiniFuture() { - // No-op. - } - - /** * @param m Mapping. */ - MiniFuture(GridDistributedTxMapping<K, V> m) { + MiniFuture(GridDistributedTxMapping m) { - super(cctx.kernalContext()); - this.m = m; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 6669773,9ada718..4477587 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@@ -761,11 -759,11 +760,11 @@@ public class GridNearTxLocal extends Gr cctx.mvcc().addFuture(fut); - IgniteInternalFuture<IgniteInternalTx<K, V>> prepareFut = prepFut.get(); + IgniteInternalFuture<IgniteInternalTx> prepareFut = prepFut.get(); - prepareFut.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - prepareFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { - GridNearTxFinishFuture<K, V> fut0 = commitFut.get(); ++ prepareFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { + GridNearTxFinishFuture fut0 = commitFut.get(); try { // Make sure that here are no exceptions. @@@ -832,8 -830,8 +831,8 @@@ } } else { - prepFut.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { ++ prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { try { // Check for errors in prepare future. f.get(); @@@ -966,10 -964,10 +965,10 @@@ if (prep != null) return (IgniteInternalFuture<IgniteInternalTx>)(IgniteInternalFuture)prep; - return new GridFinishedFuture<IgniteInternalTx>(cctx.kernalContext(), this); + return new GridFinishedFuture<IgniteInternalTx>(this); } - final GridDhtTxFinishFuture<K, V> fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); + final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); cctx.mvcc().addFuture(fut); @@@ -995,8 -993,8 +994,8 @@@ } } else - prep.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { ++ prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { try { f.get(); // Check for errors of a parent future. @@@ -1048,8 -1046,8 +1047,8 @@@ fut.finish(); } else - prep.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> f) { ++ prep.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { try { f.get(); // Check for errors of a parent future. } @@@ -1077,13 -1075,13 +1076,13 @@@ checkValid(); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); + return new GridFinishedFuture<>(e); } - final GridCacheReturn<V> ret = new GridCacheReturn<>(false); + final GridCacheReturn ret = new GridCacheReturn(localResult(), false); if (F.isEmpty(keys)) - return new GridFinishedFuture<>(cctx.kernalContext(), ret); + return new GridFinishedFuture<>(ret); init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 3a86888,cc5fcf4..778570a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@@ -50,11 -50,8 +49,8 @@@ import static org.apache.ignite.transac /** * */ -public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx<K, V>> - implements GridCacheMvccFuture<K, V, IgniteInternalTx<K, V>> { +public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx> + implements GridCacheMvccFuture<IgniteInternalTx> { - /** */ - private static final long serialVersionUID = 0L; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@@ -66,11 -66,8 +65,8 @@@ /** Transaction. */ @GridToStringExclude - private GridNearTxLocal<K, V> tx; + private GridNearTxLocal tx; - /** Logger. */ - private IgniteLogger log; - /** Error. */ @GridToStringExclude private AtomicReference<Throwable> err = new AtomicReference<>(null); @@@ -82,16 -79,9 +78,9 @@@ private GridDhtTxMapping<K, V> txMapping; /** */ - private Collection<IgniteTxKey<K>> lockKeys = new GridConcurrentHashSet<>(); + private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); /** - * Empty constructor required for {@link Externalizable}. - */ - public GridNearTxPrepareFuture() { - // No-op. - } - - /** * @param cctx Context. * @param tx Transaction. */ @@@ -828,11 -821,8 +820,8 @@@ * Mini-future for get operations. Mini-futures are only waiting on a single * node as opposed to multiple nodes. */ - private class MiniFuture extends GridFutureAdapter<IgniteInternalTx<K, V>> { + private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { /** */ - private static final long serialVersionUID = 0L; - - /** */ private final IgniteUuid futId = IgniteUuid.randomUuid(); /** Keys. */ @@@ -843,23 -833,16 +832,16 @@@ private AtomicBoolean rcvRes = new AtomicBoolean(false); /** Mappings to proceed prepare. */ - private ConcurrentLinkedDeque8<GridDistributedTxMapping<K, V>> mappings; + private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings; /** - * Empty constructor required for {@link Externalizable}. - */ - public MiniFuture() { - // No-op. - } - - /** * @param m Mapping. * @param mappings Queue of mappings to proceed with. */ - MiniFuture(GridDistributedTxMapping m, - ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) { - super(cctx.kernalContext()); - + MiniFuture( - GridDistributedTxMapping<K, V> m, - ConcurrentLinkedDeque8<GridDistributedTxMapping<K, V>> mappings ++ GridDistributedTxMapping m, ++ ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings + ) { this.m = m; this.mappings = mappings; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 5eb4284,5833270..f493dbc --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@@ -131,12 -124,10 +131,12 @@@ public class GridLocalCache<K, V> exten * @param filter Filter. * @return Future. */ - public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout, - @Nullable IgniteTxLocalEx<K, V> tx, IgnitePredicate<Cache.Entry<K, V>>[] filter) { + public IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys, + long timeout, + @Nullable IgniteTxLocalEx tx, + CacheEntryPredicate[] filter) { if (F.isEmpty(keys)) - return new GridFinishedFuture<>(ctx.kernalContext(), true); + return new GridFinishedFuture<>(true); GridLocalLockFuture<K, V> fut = new GridLocalLockFuture<>(ctx, keys, tx, this, timeout, filter); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index 8a8fb5c,320a443..dad40fd --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@@ -38,10 -37,7 +37,7 @@@ import java.util.concurrent.atomic.* * Cache lock future. */ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> - implements GridCacheMvccFuture<K, V, Boolean> { + implements GridCacheMvccFuture<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@@ -77,15 -76,11 +76,11 @@@ /** Lock timeout. */ private long timeout; - /** Logger. */ - @GridToStringExclude - private IgniteLogger log; - /** Filter. */ - private IgnitePredicate<Cache.Entry<K, V>>[] filter; + private CacheEntryPredicate[] filter; /** Transaction. */ - private IgniteTxLocalEx<K, V> tx; + private IgniteTxLocalEx tx; /** Trackable flag. */ private boolean trackable = true; @@@ -107,13 -95,11 +95,11 @@@ */ GridLocalLockFuture( GridCacheContext<K, V> cctx, - Collection<? extends K> keys, - IgniteTxLocalEx<K, V> tx, + Collection<KeyCacheObject> keys, + IgniteTxLocalEx tx, GridLocalCache<K, V> cache, long timeout, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { - super(cctx.kernalContext()); - assert keys != null; assert cache != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java index 35b0a12,da7f73b..33e6174 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java @@@ -113,10 -111,10 +113,10 @@@ class GridLocalTx extends IgniteTxLocal try { prepare(); - return new GridFinishedFuture<IgniteInternalTx>(cctx.kernalContext(), this); - return new GridFinishedFuture<IgniteInternalTx<K, V>>(this); ++ return new GridFinishedFuture<IgniteInternalTx>(this); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); + return new GridFinishedFuture<>(e); } } @@@ -155,10 -153,10 +155,10 @@@ catch (IgniteCheckedException e) { state(UNKNOWN); - return new GridFinishedFuture<>(cctx.kernalContext(), e); + return new GridFinishedFuture<>(e); } - GridLocalTxFuture<K, V> fut = this.fut.get(); + GridLocalTxFuture fut = this.fut.get(); if (fut == null) { if (this.fut.compareAndSet(null, fut = new GridLocalTxFuture<>(cctx, this))) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java index bc248aa,c80292c..daf22c6 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java @@@ -37,11 -36,8 +36,8 @@@ import static org.apache.ignite.transac /** * Replicated cache transaction future. */ -final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx<K, V>> - implements GridCacheMvccFuture<K, V, IgniteInternalTx<K, V>> { +final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx> + implements GridCacheMvccFuture<IgniteInternalTx> { - /** */ - private static final long serialVersionUID = 0L; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@@ -82,9 -70,7 +70,7 @@@ */ GridLocalTxFuture( GridCacheSharedContext<K, V> cctx, - GridLocalTx<K, V> tx) { + GridLocalTx tx) { - super(cctx.kernalContext()); - assert cctx != null; assert tx != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 8fb8ab4,6e0a85f..248515c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@@ -1532,8 -1553,8 +1532,8 @@@ public class GridLocalAtomicCache<K, V TransactionIsolation isolation, boolean invalidate, long accessTtl, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { + CacheEntryPredicate[] filter) { - return new GridFinishedFutureEx<>(new UnsupportedOperationException("Locks are not supported for " + + return new GridFinishedFuture<>(new UnsupportedOperationException("Locks are not supported for " + "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)")); } @@@ -1541,8 -1562,8 +1541,8 @@@ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, long timeout, - @Nullable IgnitePredicate<Cache.Entry<K, V>>... filter) { + @Nullable CacheEntryPredicate... filter) { - return new GridFinishedFutureEx<>(new UnsupportedOperationException("Locks are not supported for " + + return new GridFinishedFuture<>(new UnsupportedOperationException("Locks are not supported for " + "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)")); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 2b7ecb6,b69c6d9..ecdc939 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@@ -167,19 -167,18 +167,18 @@@ public class IgniteTxHandler * @param req Near prepare request. * @return Prepare future. */ - private IgniteInternalFuture<IgniteInternalTx<K, V>> prepareColocatedTx( - final GridNearTxLocal<K, V> locTx, - final GridNearTxPrepareRequest<K, V> req, - final IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb + private IgniteInternalFuture<IgniteInternalTx> prepareColocatedTx( + final GridNearTxLocal locTx, + final GridNearTxPrepareRequest req, + final IgniteInClosure<GridNearTxPrepareResponse> completeCb ) { - IgniteInternalFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys. + IgniteInternalFuture<Object> fut = new GridFinishedFuture<>(); // TODO force preload keys. return new GridEmbeddedFuture<>( - ctx.kernalContext(), fut, - new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx<K, V>>>() { - @Override public IgniteInternalFuture<IgniteInternalTx<K, V>> apply(Object o, Exception ex) { + new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() { + @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception ex) { if (ex != null) throw new GridClosureException(ex); @@@ -240,10 -239,10 +239,10 @@@ e.unmarshal(ctx, false, ctx.deploy().globalLoader()); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx.kernalContext(), e); + return new GridFinishedFuture<>(e); } - GridDhtTxLocal<K, V> tx; + GridDhtTxLocal tx; GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version()); @@@ -324,10 -323,10 +323,10 @@@ } } - final GridDhtTxLocal<K, V> tx0 = tx; + final GridDhtTxLocal tx0 = tx; - fut.listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - fut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx<K, V>>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx<K, V>> txFut) { ++ fut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> txFut) { try { txFut.get(); } @@@ -343,7 -342,7 +342,7 @@@ return fut; } else - return new GridFinishedFuture<>(ctx.kernalContext(), (IgniteInternalTx)null); - return new GridFinishedFuture<>((IgniteInternalTx<K, V>)null); ++ return new GridFinishedFuture<>((IgniteInternalTx)null); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index fa0f293,5d39822..bc3744e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@@ -360,20 -353,20 +360,20 @@@ public abstract class IgniteTxLocalAdap if (!async) { try { if (!readThrough || !cacheCtx.readThrough()) { - for (K key : keys) + for (KeyCacheObject key : keys) c.apply(key, null); - return new GridFinishedFuture<>(cctx.kernalContext(), false); + return new GridFinishedFuture<>(false); } - return new GridFinishedFuture<>(cctx.kernalContext(), + return new GridFinishedFuture<>( cacheCtx.store().loadAllFromStore(this, keys, c)); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); + return new GridFinishedFuture<>(e); } } - else + else { return cctx.kernalContext().closure().callLocalSafe( new GPC<Boolean>() { @Override public Boolean call() throws Exception { @@@ -1408,188 -1411,177 +1408,194 @@@ if (log.isDebugEnabled()) log.debug("Loading missed values for missed map: " + missedMap); - final Collection<K> loaded = new HashSet<>(); + final Collection<KeyCacheObject> loaded = new HashSet<>(); - return new GridEmbeddedFuture<>(cctx.kernalContext(), - loadMissing( - cacheCtx, - true, false, missedMap.keySet(), deserializePortable, skipVals, new CI2<KeyCacheObject, Object>() { - /** */ - private GridCacheVersion nextVer; + return new GridEmbeddedFuture<>( + new C2<Boolean, Exception, Map<K, V>>() { + @Override public Map<K, V> apply(Boolean b, Exception e) { + if (e != null) { + setRollbackOnly(); - @Override public void apply(KeyCacheObject key, Object val) { - if (isRollbackOnly()) { - if (log.isDebugEnabled()) - log.debug("Ignoring loaded value for read because transaction was rolled back: " + - IgniteTxLocalAdapter.this); + throw new GridClosureException(e); + } + + if (!b && !readCommitted()) { + // There is no store - we must mark the entries. - for (K key : missedMap.keySet()) { - IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key)); ++ for (KeyCacheObject key : missedMap.keySet()) { ++ IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); - return; + if (txEntry != null) + txEntry.markValid(); + } } - GridCacheVersion ver = missedMap.get(key); + if (readCommitted()) { - Collection<K> notFound = new HashSet<>(missedMap.keySet()); ++ Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet()); - if (ver == null) { - if (log.isDebugEnabled()) - log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); + notFound.removeAll(loaded); - return; - } + // In read-committed mode touch entries that have just been read. - for (K key : notFound) { - IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key)); ++ for (KeyCacheObject key : notFound) { ++ IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); - CacheObject cacheVal = cacheCtx.toCacheObject(val); - GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) : ++ GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) : + txEntry.cached(); - CacheObject visibleVal = cacheVal; + if (entry != null) + cacheCtx.evicts().touch(entry, topologyVersion()); + } + } - IgniteTxKey txKey = cacheCtx.txKey(key); + return map; + } + }, + loadMissing( + cacheCtx, + true, + false, + missedMap.keySet(), + deserializePortable, + skipVals, - new CI2<K, V>() { ++ new CI2<KeyCacheObject, Object>() { + /** */ + private GridCacheVersion nextVer; - IgniteTxEntry txEntry = entry(txKey); - @Override public void apply(K key, V val) { ++ @Override public void apply(KeyCacheObject key, Object val) { + if (isRollbackOnly()) { + if (log.isDebugEnabled()) + log.debug("Ignoring loaded value for read because transaction was rolled back: " + + IgniteTxLocalAdapter.this); - if (txEntry != null) { - if (!readCommitted()) - txEntry.readValue(cacheVal); + return; + } - if (!F.isEmpty(txEntry.entryProcessors())) - visibleVal = txEntry.applyEntryProcessors(visibleVal); - } + GridCacheVersion ver = missedMap.get(key); - // In pessimistic mode we hold the lock, so filter validation - // should always be valid. - if (pessimistic()) - ver = null; + if (ver == null) { + if (log.isDebugEnabled()) + log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); - // Initialize next version. - if (nextVer == null) - nextVer = cctx.versions().next(topologyVersion()); + return; + } - while (true) { - assert txEntry != null || readCommitted() || groupLock() || skipVals; - V visibleVal = val; ++ CacheObject cacheVal = cacheCtx.toCacheObject(val); + - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); ++ CacheObject visibleVal = cacheVal; - try { - // Must initialize to true since even if filter didn't pass, - // we still record the transaction value. - boolean set; - IgniteTxKey<K> txKey = cacheCtx.txKey(key); ++ IgniteTxKey txKey = cacheCtx.txKey(key); - try { - set = e.versionedValue(cacheVal, ver, nextVer); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAll method " + - "(will try again): " + e); - IgniteTxEntry<K, V> txEntry = entry(txKey); ++ IgniteTxEntry txEntry = entry(txKey); - if (pessimistic() && !readCommitted() && !isRollbackOnly() && - (!groupLock() || F.eq(e.key(), groupLockKey()))) { - U.error(log, "Inconsistent transaction state (entry got removed while " + - "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); + if (txEntry != null) { + if (!readCommitted()) - txEntry.readValue(val); ++ txEntry.readValue(cacheVal); - setRollbackOnly(); + if (!F.isEmpty(txEntry.entryProcessors())) + visibleVal = txEntry.applyEntryProcessors(visibleVal); + } - return; - } + // In pessimistic mode we hold the lock, so filter validation + // should always be valid. + if (pessimistic()) + ver = null; - if (txEntry != null) - txEntry.cached(entryEx(cacheCtx, txKey)); + // Initialize next version. + if (nextVer == null) + nextVer = cctx.versions().next(topologyVersion()); - continue; // While loop. - } + while (true) { + assert txEntry != null || readCommitted() || groupLock() || skipVals; - // In pessimistic mode, we should always be able to set. - assert set || !pessimistic(); - GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); ++ GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); - if (readCommitted() || groupLock() || skipVals) { - cacheCtx.evicts().touch(e, topologyVersion()); + try { + // Must initialize to true since even if filter didn't pass, + // we still record the transaction value. + boolean set; - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); + try { - set = e.versionedValue(val, ver, nextVer); ++ set = e.versionedValue(cacheVal, ver, nextVer); } - } - else { - assert txEntry != null; + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction getAll method " + + "(will try again): " + e); - txEntry.setAndMarkValid(cacheVal); + if (pessimistic() && !readCommitted() && !isRollbackOnly() && + (!groupLock() || F.eq(e.key(), groupLockKey()))) { + U.error(log, "Inconsistent transaction state (entry got removed while " + + "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - } + setRollbackOnly(); - loaded.add(key); + return; + } - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry from transaction [set=" + set + - ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); + if (txEntry != null) + txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes()); - break; // While loop. - } - catch (IgniteCheckedException ex) { - throw new IgniteException("Failed to put value for cache entry: " + e, ex); - } - } - } - }), - new C2<Boolean, Exception, Map<K, V>>() { - @Override public Map<K, V> apply(Boolean b, Exception e) { - if (e != null) { - setRollbackOnly(); + continue; // While loop. + } - throw new GridClosureException(e); - } + // In pessimistic mode, we should always be able to set. + assert set || !pessimistic(); - if (!b && !readCommitted()) { - // There is no store - we must mark the entries. - for (KeyCacheObject key : missedMap.keySet()) { - IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); + if (readCommitted() || groupLock() || skipVals) { + cacheCtx.evicts().touch(e, topologyVersion()); - if (txEntry != null) - txEntry.markValid(); - } - } - if (visibleVal != null) - map.put(key, (V)CU.skipValue(visibleVal, skipVals)); ++ if (visibleVal != null) { ++ cacheCtx.addResult(map, ++ key, ++ visibleVal, ++ skipVals, ++ keepCacheObjects, ++ deserializePortable, ++ false); ++ } + } + else { + assert txEntry != null; - if (readCommitted()) { - Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet()); - txEntry.setAndMarkValid(val); ++ txEntry.setAndMarkValid(cacheVal); - notFound.removeAll(loaded); - if (visibleVal != null) - map.put(key, visibleVal); ++ if (visibleVal != null) { ++ cacheCtx.addResult(map, ++ key, ++ visibleVal, ++ skipVals, ++ keepCacheObjects, ++ deserializePortable, ++ false); ++ } + } - // In read-committed mode touch entries that have just been read. - for (KeyCacheObject key : notFound) { - IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); + loaded.add(key); - GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) : - txEntry.cached(); + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry from transaction [set=" + set + + ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); - if (entry != null) - cacheCtx.evicts().touch(entry, topologyVersion()); + break; // While loop. + } + catch (IgniteCheckedException ex) { + throw new IgniteException("Failed to put value for cache entry: " + e, ex); + } } } - - return map; - } - }); + }) + ); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( - final GridCacheContext<K, V> cacheCtx, - Collection<? extends K> keys, - @Nullable GridCacheEntryEx<K, V> cached, + @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync( + final GridCacheContext cacheCtx, + Collection<KeyCacheObject> keys, + @Nullable GridCacheEntryEx cached, final boolean deserializePortable, - final boolean skipVals) { + final boolean skipVals, + final boolean keepCacheObjects) { if (F.isEmpty(keys)) - return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap()); + return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); init(); @@@ -1616,11 -1608,10 +1622,11 @@@ missed, keysCnt, deserializePortable, - skipVals); + skipVals, + keepCacheObjects); if (single && missed.isEmpty()) - return new GridFinishedFuture<>(cctx.kernalContext(), retMap); + return new GridFinishedFuture<>(retMap); // Handle locks. if (pessimistic() && !readCommitted() && !groupLock() && !skipVals) { @@@ -1730,17 -1715,10 +1736,17 @@@ } } - if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal())) - return checkMissed(cacheCtx, retMap, missed, null, deserializePortable, skipVals); + if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal())) { + return checkMissed(cacheCtx, + retMap, + missed, + null, + deserializePortable, + skipVals, + keepCacheObjects); + } - return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap()); + return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); } }; @@@ -1787,23 -1764,16 +1792,22 @@@ if (!missed.isEmpty()) { if (!readCommitted()) - for (Iterator<K> it = missed.keySet().iterator(); it.hasNext(); ) - if (retMap.containsKey(it.next())) + for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) { + KeyCacheObject cacheKey = it.next(); + + K keyVal = + (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false)); + + if (retMap.containsKey(keyVal)) it.remove(); + } if (missed.isEmpty()) - return new GridFinishedFuture<>(cctx.kernalContext(), retMap); + return new GridFinishedFuture<>(retMap); return new GridEmbeddedFuture<>( - cctx.kernalContext(), // First future. - checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, skipVals), + checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, skipVals, keepCacheObjects), // Closure that returns another future, based on result from first. new PMC<Map<K, V>>() { @Override public IgniteInternalFuture<Map<K, V>> postMiss(Map<K, V> map) { @@@ -1981,10 -1938,10 +1985,10 @@@ addActiveCache(cacheCtx); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); + return new GridFinishedFuture<>(e); } - Set<K> skipped = null; + Set<KeyCacheObject> skipped = null; boolean rmv = lookup == null && invokeMap == null; @@@ -2288,10 -2244,8 +2292,8 @@@ }); return new GridEmbeddedFuture<>( - cctx.kernalContext(), - fut, - new C2<Boolean, Exception, Set<K>>() { - @Override public Set<K> apply(Boolean b, Exception e) { + new C2<Boolean, Exception, Set<KeyCacheObject>>() { + @Override public Set<KeyCacheObject> apply(Boolean b, Exception e) { if (e != null) throw new GridClosureException(e); @@@ -2670,11 -2661,11 +2672,11 @@@ loadFut.get(); } catch (IgniteCheckedException e) { - return new GridFinishedFutureEx<>(new GridCacheReturn(localResult()), e); + return new GridFinishedFuture<>(e); } - return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn<V>>() { - @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { + return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { txFut.get(); return implicitRes; @@@ -2752,10 -2759,10 +2754,10 @@@ checkValid(); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(cctx.kernalContext(), e); + return new GridFinishedFuture<>(e); } - final GridCacheReturn<V> ret = new GridCacheReturn<>(false); + final GridCacheReturn ret = new GridCacheReturn(localResult(), false); if (F.isEmpty(keys0)) { if (implicit()) { @@@ -3009,8 -3006,8 +3011,8 @@@ isolation, isInvalidate(), -1L, - CU.<K, V>empty()) : + CU.empty0()) : - new GridFinishedFuture<>(cctx.kernalContext()); + new GridFinishedFuture<>(); } catch (IgniteCheckedException e) { setRollbackOnly(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index b2329bb,609bd3e..71098ee --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@@ -65,25 -65,25 +65,25 @@@ public class IgniteTxManager extends Gr private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100); /** Committing transactions. */ - private final ThreadLocal<IgniteInternalTx> threadCtx = new GridThreadLocalEx<>(); + private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>(); /** Per-thread transaction map. */ - private final ConcurrentMap<Long, IgniteInternalTx<K, V>> threadMap = newMap(); + private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap(); /** Per-ID map. */ - private final ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> idMap = newMap(); + private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> idMap = newMap(); /** Per-ID map for near transactions. */ - private final ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> nearIdMap = newMap(); + private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap = newMap(); /** TX handler. */ - private IgniteTxHandler<K, V> txHandler; + private IgniteTxHandler txHandler; /** All transactions. */ - private final Queue<IgniteInternalTx<K, V>> committedQ = new ConcurrentLinkedDeque8<>(); + private final Queue<IgniteInternalTx> committedQ = new ConcurrentLinkedDeque8<>(); /** Preparing transactions. */ - private final Queue<IgniteInternalTx<K, V>> prepareQ = new ConcurrentLinkedDeque8<>(); + private final Queue<IgniteInternalTx> prepareQ = new ConcurrentLinkedDeque8<>(); /** Minimum start version. */ private final ConcurrentNavigableMap<GridCacheVersion, AtomicInt> startVerCnts = @@@ -389,11 -389,11 +389,11 @@@ * @param tx Created transaction. * @return Started transaction. */ - @Nullable public <T extends IgniteInternalTx<K, V>> T onCreated(T tx) { - ConcurrentMap<GridCacheVersion, IgniteInternalTx<K, V>> txIdMap = transactionMap(tx); + @Nullable public <T extends IgniteInternalTx> T onCreated(T tx) { + ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx); // Start clean. - txContextReset(); + resetContext(); if (isCompleted(tx)) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 81fdbed,90f519d..193e3f8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@@ -169,8 -175,13 +169,11 @@@ public class IgniteDataLoaderImpl<K, V assert ctx != null; this.ctx = ctx; - this.cacheName = cacheName; - this.flushQ = flushQ; - this.compact = compact; + this.cacheObjProc = ctx.cacheObjects(); + if (log == null) + log = U.logger(ctx, logRef, IgniteDataLoaderImpl.class); + ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes()); if (node == null) @@@ -458,13 -397,8 +461,13 @@@ return new IgniteFutureImpl<>(resFut); } - catch (IgniteException e) { + catch (Throwable e) { + resFut.onDone(e); + + if (e instanceof Error) + throw e; + - return new IgniteFinishedFutureImpl<>(ctx, e); + return new IgniteFinishedFutureImpl<>(e); } finally { leaveBusy(); @@@ -941,9 -870,9 +944,9 @@@ synchronized (this) { curFut0 = curFut; - curFut0.listenAsync(lsnr); + curFut0.listen(lsnr); - for (Map.Entry<K, V> entry : newEntries) + for (IgniteDataLoaderEntry entry : newEntries) entries.add(entry); if (entries.size() >= bufSize) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFinishPartitionsSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e59ea498/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ----------------------------------------------------------------------