ignite-579 Avoid synchronous waiting for new affinity version in cache get future.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4833ec98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4833ec98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4833ec98 Branch: refs/heads/ignite-tests-todo Commit: 4833ec989171efed810bb312aec401f2afd54540 Parents: 42e467c Author: sboikov <sboi...@gridgain.com> Authored: Wed Mar 25 18:04:04 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Mar 25 18:04:04 2015 +0300 ---------------------------------------------------------------------- .../GridFutureRemapTimeoutObject.java | 73 ++++++++++++++++++ .../dht/GridPartitionedGetFuture.java | 24 +++++- .../distributed/near/GridNearGetFuture.java | 79 +++++--------------- 3 files changed, 110 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4833ec98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java new file mode 100644 index 0000000..8f3cfe5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.timeout.*; +import org.apache.ignite.internal.util.future.*; + +import java.util.concurrent.atomic.*; + +/** + * Future remap timeout object. + */ +public class GridFutureRemapTimeoutObject extends GridTimeoutObjectAdapter { + /** */ + private final GridFutureAdapter<?> fut; + + /** Finished flag. */ + private final AtomicBoolean finished = new AtomicBoolean(); + + /** Topology version to wait. */ + private final AffinityTopologyVersion topVer; + + /** Exception cause. */ + private final IgniteCheckedException e; + + /** + * @param fut Future. + * @param timeout Timeout. + * @param topVer Topology version timeout was created on. + * @param e Exception cause. + */ + public GridFutureRemapTimeoutObject( + GridFutureAdapter<?> fut, + long timeout, + AffinityTopologyVersion topVer, + IgniteCheckedException e) { + super(timeout); + + this.fut = fut; + this.topVer = topVer; + this.e = e; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (finish()) // Fail the whole get future, else remap happened concurrently. + fut.onDone(new IgniteCheckedException("Failed to wait for topology version to change: " + topVer, e)); + } + + /** + * @return Guard against concurrent completion. + */ + public boolean finish() { + return finished.compareAndSet(false, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4833ec98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index d9c1f3c..bb6d9e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; @@ -606,12 +607,27 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); + final AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, + cctx.kernalContext().config().getNetworkTimeout(), + updTopVer, + e); - onDone(Collections.<K, V>emptyMap()); + cctx.affinity().affinityReadyFuture(updTopVer).listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + if (timeout.finish()) { + cctx.kernalContext().timeout().removeTimeoutObject(timeout); + + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); + + onDone(Collections.<K, V>emptyMap()); + } + } + }); + + cctx.kernalContext().timeout().addTimeoutObject(timeout); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4833ec98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 9f8f550..0fe304d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -23,10 +23,10 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; @@ -725,33 +725,27 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); + final AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); - if (updTopVer.compareTo(topVer) > 0) { - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, + cctx.kernalContext().config().getNetworkTimeout(), + updTopVer, + e); - onDone(Collections.<K, V>emptyMap()); - } - else { - final RemapTimeoutObject timeout = new RemapTimeoutObject( - cctx.kernalContext().config().getNetworkTimeout(), topVer, e); - - cctx.discovery().topologyFuture(topVer.topologyVersion() + 1).listen(new CI1<IgniteInternalFuture<Long>>() { - @Override public void apply(IgniteInternalFuture<Long> longIgniteFuture) { - if (timeout.finish()) { - cctx.kernalContext().timeout().removeTimeoutObject(timeout); + cctx.affinity().affinityReadyFuture(updTopVer).listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + if (timeout.finish()) { + cctx.kernalContext().timeout().removeTimeoutObject(timeout); - // Remap. - map(keys.keySet(), F.t(node, keys), cctx.affinity().affinityTopologyVersion()); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); - onDone(Collections.<K, V>emptyMap()); - } + onDone(Collections.<K, V>emptyMap()); } - }); + } + }); - cctx.kernalContext().timeout().addTimeoutObject(timeout); - } + cctx.kernalContext().timeout().addTimeoutObject(timeout); } /** @@ -814,45 +808,6 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma return S.toString(MiniFuture.class, this); } - /** - * Remap timeout object. - */ - private class RemapTimeoutObject extends GridTimeoutObjectAdapter { - /** Finished flag. */ - private AtomicBoolean finished = new AtomicBoolean(); - - /** Topology version to wait. */ - private AffinityTopologyVersion topVer; - - /** Exception cause. */ - private IgniteCheckedException e; - - /** - * @param timeout Timeout. - * @param topVer Topology version timeout was created on. - */ - private RemapTimeoutObject(long timeout, @NotNull AffinityTopologyVersion topVer, IgniteCheckedException e) { - super(timeout); - - this.topVer = topVer; - this.e = e; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - if (finish()) - // Fail the whole get future. - onDone(new IgniteCheckedException("Failed to wait for topology version to change: " - + (topVer.topologyVersion() + 1), e)); - // else remap happened concurrently. - } - - /** - * @return Guard against concurrent completion. - */ - public boolean finish() { - return finished.compareAndSet(false, true); - } - } } + }