ignite-579 Avoid synchronous waiting for new affinity version in cache get 
future.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4833ec98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4833ec98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4833ec98

Branch: refs/heads/ignite-tc-jclient
Commit: 4833ec989171efed810bb312aec401f2afd54540
Parents: 42e467c
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Mar 25 18:04:04 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Mar 25 18:04:04 2015 +0300

----------------------------------------------------------------------
 .../GridFutureRemapTimeoutObject.java           | 73 ++++++++++++++++++
 .../dht/GridPartitionedGetFuture.java           | 24 +++++-
 .../distributed/near/GridNearGetFuture.java     | 79 +++++---------------
 3 files changed, 110 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4833ec98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
new file mode 100644
index 0000000..8f3cfe5
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.timeout.*;
+import org.apache.ignite.internal.util.future.*;
+
+import java.util.concurrent.atomic.*;
+
+/**
+ * Future remap timeout object.
+ */
+public class GridFutureRemapTimeoutObject extends GridTimeoutObjectAdapter {
+    /** */
+    private final GridFutureAdapter<?> fut;
+
+    /** Finished flag. */
+    private final AtomicBoolean finished = new AtomicBoolean();
+
+    /** Topology version to wait. */
+    private final AffinityTopologyVersion topVer;
+
+    /** Exception cause. */
+    private final IgniteCheckedException e;
+
+    /**
+     * @param fut Future.
+     * @param timeout Timeout.
+     * @param topVer Topology version timeout was created on.
+     * @param e Exception cause.
+     */
+    public GridFutureRemapTimeoutObject(
+        GridFutureAdapter<?> fut,
+        long timeout,
+        AffinityTopologyVersion topVer,
+        IgniteCheckedException e) {
+        super(timeout);
+
+        this.fut = fut;
+        this.topVer = topVer;
+        this.e = e;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        if (finish()) // Fail the whole get future, else remap happened 
concurrently.
+            fut.onDone(new IgniteCheckedException("Failed to wait for topology 
version to change: " + topVer, e));
+    }
+
+    /**
+     * @return Guard against concurrent completion.
+     */
+    public boolean finish() {
+        return finished.compareAndSet(false, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4833ec98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index d9c1f3c..bb6d9e8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
@@ -606,12 +607,27 @@ public class GridPartitionedGetFuture<K, V> extends 
GridCompoundIdentityFuture<M
             if (log.isDebugEnabled())
                 log.debug("Remote node left grid while sending or waiting for 
reply (will retry): " + this);
 
-            AffinityTopologyVersion updTopVer = new 
AffinityTopologyVersion(cctx.discovery().topologyVersion());
+            final AffinityTopologyVersion updTopVer = new 
AffinityTopologyVersion(cctx.discovery().topologyVersion());
 
-            // Remap.
-            map(keys.keySet(), F.t(node, keys), updTopVer);
+            final GridFutureRemapTimeoutObject timeout = new 
GridFutureRemapTimeoutObject(this,
+                cctx.kernalContext().config().getNetworkTimeout(),
+                updTopVer,
+                e);
 
-            onDone(Collections.<K, V>emptyMap());
+            cctx.affinity().affinityReadyFuture(updTopVer).listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    if (timeout.finish()) {
+                        
cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+
+                        // Remap.
+                        map(keys.keySet(), F.t(node, keys), updTopVer);
+
+                        onDone(Collections.<K, V>emptyMap());
+                    }
+                }
+            });
+
+            cctx.kernalContext().timeout().addTimeoutObject(timeout);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4833ec98/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 9f8f550..0fe304d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -23,10 +23,10 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -725,33 +725,27 @@ public final class GridNearGetFuture<K, V> extends 
GridCompoundIdentityFuture<Ma
             if (log.isDebugEnabled())
                 log.debug("Remote node left grid while sending or waiting for 
reply (will retry): " + this);
 
-            AffinityTopologyVersion updTopVer = new 
AffinityTopologyVersion(cctx.discovery().topologyVersion());
+            final AffinityTopologyVersion updTopVer = new 
AffinityTopologyVersion(cctx.discovery().topologyVersion());
 
-            if (updTopVer.compareTo(topVer) > 0) {
-                // Remap.
-                map(keys.keySet(), F.t(node, keys), updTopVer);
+            final GridFutureRemapTimeoutObject timeout = new 
GridFutureRemapTimeoutObject(this,
+                cctx.kernalContext().config().getNetworkTimeout(),
+                updTopVer,
+                e);
 
-                onDone(Collections.<K, V>emptyMap());
-            }
-            else {
-                final RemapTimeoutObject timeout = new RemapTimeoutObject(
-                    cctx.kernalContext().config().getNetworkTimeout(), topVer, 
e);
-
-                cctx.discovery().topologyFuture(topVer.topologyVersion() + 
1).listen(new CI1<IgniteInternalFuture<Long>>() {
-                    @Override public void apply(IgniteInternalFuture<Long> 
longIgniteFuture) {
-                        if (timeout.finish()) {
-                            
cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+            cctx.affinity().affinityReadyFuture(updTopVer).listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    if (timeout.finish()) {
+                        
cctx.kernalContext().timeout().removeTimeoutObject(timeout);
 
-                            // Remap.
-                            map(keys.keySet(), F.t(node, keys), 
cctx.affinity().affinityTopologyVersion());
+                        // Remap.
+                        map(keys.keySet(), F.t(node, keys), updTopVer);
 
-                            onDone(Collections.<K, V>emptyMap());
-                        }
+                        onDone(Collections.<K, V>emptyMap());
                     }
-                });
+                }
+            });
 
-                cctx.kernalContext().timeout().addTimeoutObject(timeout);
-            }
+            cctx.kernalContext().timeout().addTimeoutObject(timeout);
         }
 
         /**
@@ -814,45 +808,6 @@ public final class GridNearGetFuture<K, V> extends 
GridCompoundIdentityFuture<Ma
             return S.toString(MiniFuture.class, this);
         }
 
-        /**
-         * Remap timeout object.
-         */
-        private class RemapTimeoutObject extends GridTimeoutObjectAdapter {
-            /** Finished flag. */
-            private AtomicBoolean finished = new AtomicBoolean();
-
-            /** Topology version to wait. */
-            private AffinityTopologyVersion topVer;
-
-            /** Exception cause. */
-            private IgniteCheckedException e;
-
-            /**
-             * @param timeout Timeout.
-             * @param topVer Topology version timeout was created on.
-             */
-            private RemapTimeoutObject(long timeout, @NotNull 
AffinityTopologyVersion topVer, IgniteCheckedException e) {
-                super(timeout);
-
-                this.topVer = topVer;
-                this.e = e;
-            }
-
-            /** {@inheritDoc} */
-            @Override public void onTimeout() {
-                if (finish())
-                    // Fail the whole get future.
-                    onDone(new IgniteCheckedException("Failed to wait for 
topology version to change: "
-                        + (topVer.topologyVersion() + 1), e));
-                // else remap happened concurrently.
-            }
-
-            /**
-             * @return Guard against concurrent completion.
-             */
-            public boolean finish() {
-                return finished.compareAndSet(false, true);
-            }
-        }
     }
+
 }

Reply via email to