futures: api cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/387e164d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/387e164d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/387e164d Branch: refs/heads/ignite-47 Commit: 387e164d8be825e390055c012f81f854af131bf6 Parents: 65e6ad6 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Thu Mar 5 14:07:05 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Thu Mar 5 14:07:05 2015 +0300 ---------------------------------------------------------------------- .../internal/ComputeTaskInternalFuture.java | 3 - .../client/impl/GridClientFutureAdapter.java | 3 - .../distributed/dht/GridDhtEmbeddedFuture.java | 16 +- .../cache/distributed/dht/GridDhtGetFuture.java | 8 +- .../dht/GridDhtTransactionalCacheAdapter.java | 4 +- .../dht/colocated/GridDhtColocatedCache.java | 8 +- .../colocated/GridDhtColocatedLockFuture.java | 4 +- .../distributed/near/GridNearGetFuture.java | 3 - .../distributed/near/GridNearLockFuture.java | 4 +- .../GridCacheDistributedFieldsQueryFuture.java | 3 - .../query/GridCacheFieldsQueryErrorFuture.java | 53 ----- .../query/GridCacheLocalFieldsQueryFuture.java | 3 - .../cache/query/GridCacheLocalQueryFuture.java | 3 - .../cache/query/GridCacheQueryErrorFuture.java | 3 - .../transactions/IgniteTxLocalAdapter.java | 235 ++++++++++--------- .../GridStreamerStageExecutionFuture.java | 3 - .../util/future/GridEmbeddedFuture.java | 10 +- .../util/nio/GridNioEmbeddedFuture.java | 3 - .../internal/util/nio/GridNioFutureImpl.java | 3 - .../internal/util/worker/GridWorkerFuture.java | 3 - .../lang/IgniteFutureCancelledException.java | 3 - .../lang/IgniteFutureTimeoutException.java | 3 - .../util/future/GridEmbeddedFutureSelfTest.java | 4 +- 23 files changed, 144 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java index 219cf1b..8af977b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java @@ -34,9 +34,6 @@ import java.util.*; */ public class ComputeTaskInternalFuture<R> extends GridFutureAdapter<R> { /** */ - private static final long serialVersionUID = 0L; - - /** */ private ComputeTaskSession ses; /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java index 1576d5f..d22157a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientFutureAdapter.java @@ -30,9 +30,6 @@ import java.util.logging.*; * Future adapter. */ public class GridClientFutureAdapter<R> extends AbstractQueuedSynchronizer implements GridClientFuture<R> { - /** */ - private static final long serialVersionUID = 0L; - /** Initial state. */ private static final int INIT = 0; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java index 54633cd..3cf22e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java @@ -34,12 +34,14 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem private Collection<Integer> invalidParts; /** - * @param embedded Embedded. * @param c Closure. - * @param fake Fake. + * @param embedded Embedded. */ - public GridDhtEmbeddedFuture(IgniteInternalFuture<B> embedded, IgniteBiClosure<B, Exception, A> c, boolean fake) { - super(embedded, c, fake); + public GridDhtEmbeddedFuture( + IgniteBiClosure<B, Exception, A> c, + IgniteInternalFuture<B> embedded + ) { + super(c, embedded); invalidParts = Collections.emptyList(); } @@ -48,8 +50,10 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem * @param embedded Future to embed. * @param c Embedding closure. */ - public GridDhtEmbeddedFuture(IgniteInternalFuture<B> embedded, - IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c) { + public GridDhtEmbeddedFuture( + IgniteInternalFuture<B> embedded, + IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c + ) { super(embedded, c); invalidParts = Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index da797b0..5cff433 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -209,7 +209,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (!F.isEmpty(fut.invalidPartitions())) retries.addAll(fut.invalidPartitions()); - add(new GridEmbeddedFuture<>(fut, + add(new GridEmbeddedFuture<>( new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo<K, V>>>() { @Override public Collection<GridCacheEntryInfo<K, V>> apply(Object o, Exception e) { if (e != null) { // Check error first. @@ -240,8 +240,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col return Collections.emptyList(); } }, - false) - ); + fut)); } /** @@ -408,7 +407,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } return new GridEmbeddedFuture<>( - fut, new C2<Map<K, V>, Exception, Collection<GridCacheEntryInfo<K, V>>>() { @Override public Collection<GridCacheEntryInfo<K, V>> apply(Map<K, V> map, Exception e) { if (e != null) { @@ -432,7 +430,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } } }, - false); + fut); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index dd70681..7eda930 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -892,7 +892,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach final GridCacheVersion mappedVer = fut.version(); return new GridDhtEmbeddedFuture<>( - fut, new C2<Boolean, Exception, GridNearLockResponse<K, V>>() { @Override public GridNearLockResponse<K, V> apply(Boolean b, Exception e) { if (e != null) @@ -911,7 +910,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach return res; } - }, false); + }, + fut); } } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index dbbac1b..791fb73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -731,7 +731,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte fut.map(); return new GridDhtEmbeddedFuture<>( - fut, new C2<Boolean, Exception, Exception>() { @Override public Exception apply(Boolean b, Exception e) { if (e != null) @@ -741,7 +740,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte return e; } - }, false); + }, + fut); } else { // Handle implicit locks for pessimistic transactions. @@ -757,7 +757,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte accessTtl); return new GridDhtEmbeddedFuture<>( - txFut, new C2<GridCacheReturn<V>, Exception, Exception>() { @Override public Exception apply(GridCacheReturn<V> ret, Exception e) { @@ -768,7 +767,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte return e; } - }, false); + }, + txFut); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index fc83a5a..af1ee1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -877,7 +877,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity // Add new future. add(new GridEmbeddedFuture<>( - fut, new C2<Exception, Exception, Boolean>() { @Override public Boolean apply(Exception resEx, Exception e) { if (CU.isLockTimeoutOrCancelled(e) || @@ -923,8 +922,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity return true; } }, - false - )); + fut)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 1398f69..ed2565b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -662,9 +662,6 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma */ private class MiniFuture extends GridFutureAdapter<Map<K, V>> { /** */ - private static final long serialVersionUID = 0L; - - /** */ private final IgniteUuid futId = IgniteUuid.randomUuid(); /** Node ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index e204f05..855b96c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -954,7 +954,6 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B // Add new future. add(new GridEmbeddedFuture<>( - fut, new C2<GridNearLockResponse<K, V>, Exception, Boolean>() { @Override public Boolean apply(GridNearLockResponse<K, V> res, Exception e) { if (CU.isLockTimeoutOrCancelled(e) || @@ -1081,8 +1080,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B return true; } }, - false - )); + fut)); } else { final MiniFuture fut = new MiniFuture(node, mappedKeys, mappings); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java index 7ad8994..1aa7907 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java @@ -32,9 +32,6 @@ import java.util.*; public class GridCacheDistributedFieldsQueryFuture extends GridCacheDistributedQueryFuture<Object, Object, List<Object>> implements GridCacheQueryMetadataAware { - /** */ - private static final long serialVersionUID = 0L; - /** Meta data future. */ private final GridFutureAdapter<List<GridQueryFieldMetadata>> metaFut; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheFieldsQueryErrorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheFieldsQueryErrorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheFieldsQueryErrorFuture.java deleted file mode 100644 index 222abc1..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheFieldsQueryErrorFuture.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query; - -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.query.*; -import org.apache.ignite.internal.util.future.*; - -import java.util.*; - -/** -* Error future for fields query. -*/ -public class GridCacheFieldsQueryErrorFuture extends GridCacheQueryErrorFuture<List<?>> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private boolean incMeta; - - /** - * @param ctx Context. - * @param th Error. - * @param incMeta Include metadata flag. - */ - public GridCacheFieldsQueryErrorFuture(GridKernalContext ctx, Throwable th, boolean incMeta) { - super(ctx, th); - - this.incMeta = incMeta; - } - - /** - * @return Metadata. - */ - public IgniteInternalFuture<List<GridQueryFieldMetadata>> metadata() { - return new GridFinishedFuture<>(incMeta ? Collections.<GridQueryFieldMetadata>emptyList() : null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java index e6d545a..7e6a02c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java @@ -31,9 +31,6 @@ import java.util.*; public class GridCacheLocalFieldsQueryFuture extends GridCacheLocalQueryFuture<Object, Object, List<Object>> implements GridCacheQueryMetadataAware { - /** */ - private static final long serialVersionUID = 0L; - /** Meta data future. */ private final GridFutureAdapter<List<GridQueryFieldMetadata>> metaFut; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java index 3204653..5c15640 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java @@ -33,9 +33,6 @@ import java.util.*; */ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdapter<K, V, R> { /** */ - private static final long serialVersionUID = 0L; - - /** */ private Runnable run; /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java index 627d6b5..2999e7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java @@ -28,9 +28,6 @@ import java.util.*; * Error future. */ public class GridCacheQueryErrorFuture<T> extends GridFinishedFuture<Collection<T>> implements CacheQueryFuture<T> { - /** */ - private static final long serialVersionUID = 0L; - /** * @param ctx Context. * @param err Error. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 67d33fc..1e349ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1414,158 +1414,163 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> final Collection<K> loaded = new HashSet<>(); return new GridEmbeddedFuture<>( - loadMissing( - cacheCtx, - true, false, missedMap.keySet(), deserializePortable, skipVals, new CI2<K, V>() { - /** */ - private GridCacheVersion nextVer; - - @Override public void apply(K key, V val) { - if (isRollbackOnly()) { - if (log.isDebugEnabled()) - log.debug("Ignoring loaded value for read because transaction was rolled back: " + - IgniteTxLocalAdapter.this); + new C2<Boolean, Exception, Map<K, V>>() { + @Override public Map<K, V> apply(Boolean b, Exception e) { + if (e != null) { + setRollbackOnly(); - return; + throw new GridClosureException(e); } - GridCacheVersion ver = missedMap.get(key); - - if (ver == null) { - if (log.isDebugEnabled()) - log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); + if (!b && !readCommitted()) { + // There is no store - we must mark the entries. + for (K key : missedMap.keySet()) { + IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key)); - return; + if (txEntry != null) + txEntry.markValid(); + } } - V visibleVal = val; + if (readCommitted()) { + Collection<K> notFound = new HashSet<>(missedMap.keySet()); - IgniteTxKey<K> txKey = cacheCtx.txKey(key); + notFound.removeAll(loaded); - IgniteTxEntry<K, V> txEntry = entry(txKey); + // In read-committed mode touch entries that have just been read. + for (K key : notFound) { + IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key)); - if (txEntry != null) { - if (!readCommitted()) - txEntry.readValue(val); + GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) : + txEntry.cached(); - if (!F.isEmpty(txEntry.entryProcessors())) - visibleVal = txEntry.applyEntryProcessors(visibleVal); + if (entry != null) + cacheCtx.evicts().touch(entry, topologyVersion()); + } } - // In pessimistic mode we hold the lock, so filter validation - // should always be valid. - if (pessimistic()) - ver = null; + return map; + } + }, + loadMissing( + cacheCtx, + true, + false, + missedMap.keySet(), + deserializePortable, + skipVals, + new CI2<K, V>() { + /** */ + private GridCacheVersion nextVer; - // Initialize next version. - if (nextVer == null) - nextVer = cctx.versions().next(topologyVersion()); + @Override public void apply(K key, V val) { + if (isRollbackOnly()) { + if (log.isDebugEnabled()) + log.debug("Ignoring loaded value for read because transaction was rolled back: " + + IgniteTxLocalAdapter.this); - while (true) { - assert txEntry != null || readCommitted() || groupLock() || skipVals; + return; + } - GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); + GridCacheVersion ver = missedMap.get(key); - try { - // Must initialize to true since even if filter didn't pass, - // we still record the transaction value. - boolean set; + if (ver == null) { + if (log.isDebugEnabled()) + log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); - try { - set = e.versionedValue(val, ver, nextVer); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAll method " + - "(will try again): " + e); + return; + } - if (pessimistic() && !readCommitted() && !isRollbackOnly() && - (!groupLock() || F.eq(e.key(), groupLockKey()))) { - U.error(log, "Inconsistent transaction state (entry got removed while " + - "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); + V visibleVal = val; - setRollbackOnly(); + IgniteTxKey<K> txKey = cacheCtx.txKey(key); - return; - } + IgniteTxEntry<K, V> txEntry = entry(txKey); - if (txEntry != null) - txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes()); + if (txEntry != null) { + if (!readCommitted()) + txEntry.readValue(val); - continue; // While loop. - } + if (!F.isEmpty(txEntry.entryProcessors())) + visibleVal = txEntry.applyEntryProcessors(visibleVal); + } - // In pessimistic mode, we should always be able to set. - assert set || !pessimistic(); + // In pessimistic mode we hold the lock, so filter validation + // should always be valid. + if (pessimistic()) + ver = null; - if (readCommitted() || groupLock() || skipVals) { - cacheCtx.evicts().touch(e, topologyVersion()); + // Initialize next version. + if (nextVer == null) + nextVer = cctx.versions().next(topologyVersion()); - if (visibleVal != null) - map.put(key, (V)CU.skipValue(visibleVal, skipVals)); - } - else { - assert txEntry != null; + while (true) { + assert txEntry != null || readCommitted() || groupLock() || skipVals; - txEntry.setAndMarkValid(val); + GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); - if (visibleVal != null) - map.put(key, visibleVal); - } + try { + // Must initialize to true since even if filter didn't pass, + // we still record the transaction value. + boolean set; - loaded.add(key); + try { + set = e.versionedValue(val, ver, nextVer); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction getAll method " + + "(will try again): " + e); - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry from transaction [set=" + set + - ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); + if (pessimistic() && !readCommitted() && !isRollbackOnly() && + (!groupLock() || F.eq(e.key(), groupLockKey()))) { + U.error(log, "Inconsistent transaction state (entry got removed while " + + "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); - break; // While loop. - } - catch (IgniteCheckedException ex) { - throw new IgniteException("Failed to put value for cache entry: " + e, ex); - } - } - } - }), - new C2<Boolean, Exception, Map<K, V>>() { - @Override public Map<K, V> apply(Boolean b, Exception e) { - if (e != null) { - setRollbackOnly(); + setRollbackOnly(); - throw new GridClosureException(e); - } + return; + } - if (!b && !readCommitted()) { - // There is no store - we must mark the entries. - for (K key : missedMap.keySet()) { - IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key)); + if (txEntry != null) + txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes()); - if (txEntry != null) - txEntry.markValid(); - } - } + continue; // While loop. + } - if (readCommitted()) { - Collection<K> notFound = new HashSet<>(missedMap.keySet()); + // In pessimistic mode, we should always be able to set. + assert set || !pessimistic(); - notFound.removeAll(loaded); + if (readCommitted() || groupLock() || skipVals) { + cacheCtx.evicts().touch(e, topologyVersion()); - // In read-committed mode touch entries that have just been read. - for (K key : notFound) { - IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key)); + if (visibleVal != null) + map.put(key, (V)CU.skipValue(visibleVal, skipVals)); + } + else { + assert txEntry != null; - GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) : - txEntry.cached(); + txEntry.setAndMarkValid(val); - if (entry != null) - cacheCtx.evicts().touch(entry, topologyVersion()); + if (visibleVal != null) + map.put(key, visibleVal); + } + + loaded.add(key); + + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry from transaction [set=" + set + + ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); + + break; // While loop. + } + catch (IgniteCheckedException ex) { + throw new IgniteException("Failed to put value for cache entry: " + e, ex); + } } } - - return map; - } - }, - false); + }) + ); } /** {@inheritDoc} */ @@ -1731,7 +1736,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> return fut1.isDone() ? new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) : - new GridEmbeddedFuture<>(fut1, finClos, false); + new GridEmbeddedFuture<>(finClos, fut1); } catch (GridClosureException e) { return new GridFinishedFuture<>(e.unwrap()); @@ -2239,7 +2244,6 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> }); return new GridEmbeddedFuture<>( - fut, new C2<Boolean, Exception, Set<K>>() { @Override public Set<K> apply(Boolean b, Exception e) { if (e != null) @@ -2247,8 +2251,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> return Collections.emptySet(); } - }, - false + }, fut ); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java index 1b3269a..af48159 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java @@ -36,9 +36,6 @@ import java.util.concurrent.*; * Streamer execution future. */ public class GridStreamerStageExecutionFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 0L; - /** Logger. */ private IgniteLogger log; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java index d982b78..7da6423 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java @@ -29,20 +29,16 @@ import org.apache.ignite.lang.*; */ @SuppressWarnings({"NullableProblems"}) public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> { - /** */ - private static final long serialVersionUID = 0L; - /** Embedded future to wait for. */ private IgniteInternalFuture<B> embedded; /** - * @param embedded Embedded future. * @param c Closure to execute upon completion of embedded future. + * @param embedded Embedded future. */ public GridEmbeddedFuture( - IgniteInternalFuture<B> embedded, final IgniteBiClosure<B, Exception, A> c, - boolean fake + IgniteInternalFuture<B> embedded ) { assert embedded != null; assert c != null; @@ -220,14 +216,12 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> { private abstract class AL1 extends AsyncListener1 { /** */ private static final long serialVersionUID = 0L; - } /** Typedef. */ private abstract class AL2 extends AsyncListener2 { /** */ private static final long serialVersionUID = 0L; - } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java index 693dfbd..7311c87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java @@ -28,9 +28,6 @@ import java.io.*; * Future that delegates to some other future. */ public class GridNioEmbeddedFuture<R> extends GridNioFutureImpl<R> { - /** */ - private static final long serialVersionUID = 0L; - /** * Callback to notify that future is finished. * This method must delegate to {@link #onDone(GridNioFuture, Throwable)} method. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java index 03df52e..f4c240e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java @@ -33,9 +33,6 @@ import java.util.concurrent.locks.*; * Default future implementation. */ public class GridNioFutureImpl<R> extends AbstractQueuedSynchronizer implements GridNioFuture<R> { - /** */ - private static final long serialVersionUID = 0L; - /** Initial state. */ private static final int INIT = 0; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorkerFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorkerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorkerFuture.java index e1d156a..59dd975 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorkerFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorkerFuture.java @@ -25,9 +25,6 @@ import org.apache.ignite.internal.util.future.*; */ public class GridWorkerFuture<T> extends GridFutureAdapter<T> { /** */ - private static final long serialVersionUID = 0L; - - /** */ private GridWorker w; /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureCancelledException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureCancelledException.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureCancelledException.java index f0c01a2..928ce5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureCancelledException.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureCancelledException.java @@ -24,9 +24,6 @@ import org.jetbrains.annotations.*; * Future computation cannot be retrieved because it was cancelled. */ public class IgniteFutureCancelledException extends IgniteException { - /** */ - private static final long serialVersionUID = 0L; - /** * Creates new exception with given error message. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureTimeoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureTimeoutException.java index c2f8945..84ce7ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureTimeoutException.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFutureTimeoutException.java @@ -24,9 +24,6 @@ import org.jetbrains.annotations.*; * Future computation completion is timed out. */ public class IgniteFutureTimeoutException extends IgniteException { - /** */ - private static final long serialVersionUID = 0L; - /** * Creates new exception with given error message. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/387e164d/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridEmbeddedFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridEmbeddedFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridEmbeddedFutureSelfTest.java index 45633ae..62690bc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridEmbeddedFutureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridEmbeddedFutureSelfTest.java @@ -85,7 +85,7 @@ public class GridEmbeddedFutureSelfTest extends GridCommonAbstractTest { final GridFutureAdapter<Integer> origFut = new GridFutureAdapter<>(); // Embedded future to test. - GridEmbeddedFuture<Double, Integer> embFut = new GridEmbeddedFuture<>(origFut, + GridEmbeddedFuture<Double, Integer> embFut = new GridEmbeddedFuture<>( new C2<Integer, Exception, Double>() { @Override public Double apply(Integer val, Exception e) { if (x instanceof Error) @@ -99,7 +99,7 @@ public class GridEmbeddedFutureSelfTest extends GridCommonAbstractTest { return null; } }, - false); + origFut); assertFalse("Expect original future is not complete.", origFut.isDone()); assertFalse("Expect embedded future is not complete.", embFut.isDone());