ignite-484 - v2

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/081e75bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/081e75bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/081e75bc

Branch: refs/heads/ignite-484
Commit: 081e75bce608394937ef8e4d62e4aa12a2cc3b09
Parents: 7000722
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Wed May 13 10:29:36 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Wed May 13 10:29:36 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLocalPartition.java  | 19 +++++
 .../h2/twostep/messages/GridQueryRequest.java   | 16 ++--
 .../query/h2/twostep/GridMapQueryExecutor.java  | 84 ++++++++++++--------
 .../h2/twostep/GridReduceQueryExecutor.java     | 12 ++-
 4 files changed, 85 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/081e75bc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 0749f66..dc4982e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -70,6 +70,10 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>
     @GridToStringExclude
     private final GridFutureAdapter<?> rent;
 
+    /** Rent future. */
+    @GridToStringExclude
+    private final GridFutureAdapter<?> own;
+
     /** Entries map. */
     private final ConcurrentMap<KeyCacheObject, GridDhtCacheEntry> map;
 
@@ -111,6 +115,12 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>
             }
         };
 
+        own = new GridFutureAdapter<Object>() {
+            @Override public String toString() {
+                return "PartitionOwnFuture [part=" + 
GridDhtLocalPartition.this + ", map=" + map + ']';
+            }
+        };
+
         map = new ConcurrentHashMap8<>(cctx.config().getStartSize() /
             cctx.affinity().partitions());
 
@@ -385,6 +395,8 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>
                 // No need to keep history any more.
                 evictHist = null;
 
+                own.onDone();
+
                 return true;
             }
         }
@@ -419,6 +431,13 @@ public class GridDhtLocalPartition implements 
Comparable<GridDhtLocalPartition>
     }
 
     /**
+     * @return The future which will be completed when partition will have 
state {@link GridDhtPartitionState#OWNING}.
+     */
+    public IgniteInternalFuture<?> owningFuture() {
+        return own;
+    }
+
+    /**
      * @param updateSeq Update sequence.
      * @return Future for evict attempt.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/081e75bc/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 319a818..2d53944 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -49,7 +50,7 @@ public class GridQueryRequest implements Message {
     private Collection<GridCacheSqlQuery> qrys;
 
     /** Topology version. */
-    private long topVer;
+    private AffinityTopologyVersion topVer;
 
     /** */
     @GridDirectCollection(String.class)
@@ -70,7 +71,12 @@ public class GridQueryRequest implements Message {
      * @param topVer Topology version.
      * @param extraSpaces All space names participating in query other than 
{@code space}.
      */
-    public GridQueryRequest(long reqId, int pageSize, String space, 
Collection<GridCacheSqlQuery> qrys, long topVer,
+    public GridQueryRequest(
+        long reqId,
+        int pageSize,
+        String space,
+        Collection<GridCacheSqlQuery> qrys,
+        AffinityTopologyVersion topVer,
         List<String> extraSpaces) {
         this.reqId = reqId;
         this.pageSize = pageSize;
@@ -91,7 +97,7 @@ public class GridQueryRequest implements Message {
     /**
      * @return Topology version.
      */
-    public long topologyVersion() {
+    public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -165,7 +171,7 @@ public class GridQueryRequest implements Message {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeLong("topVer", topVer))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
@@ -221,7 +227,7 @@ public class GridQueryRequest implements Message {
                 reader.incrementState();
 
             case 4:
-                topVer = reader.readLong("topVer");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/081e75bc/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 2483912..d4cdb4e 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -202,12 +202,31 @@ public class GridMapQueryExecutor {
 
     /**
      * @param cacheName Cache name.
+     * @param topVer Topology version.
      * @return Cache context or {@code null} if none.
      */
-    @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) {
+    @Nullable private GridCacheContext<?,?> cacheContext(String cacheName, 
AffinityTopologyVersion topVer) {
         GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName);
 
-        return cache == null ? null : cache.context();
+        if (cache == null) // Since we've waited for for cache affinity 
updates, this must be a misconfiguration.
+            throw new CacheException("Cache does not exist on current node: 
[nodeId=" + ctx.localNodeId() +
+                ", cache=" + cacheName + ", topVer=" + topVer + "]");
+
+        return cache.context();
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws 
IgniteCheckedException {
+        if (topVer == null)
+            return; // Backward compatibility.
+
+        IgniteInternalFuture<?> fut = 
ctx.cache().context().exchange().affinityReadyFuture(topVer);
+
+        if (fut != null)
+            fut.get();
     }
 
     /**
@@ -224,6 +243,13 @@ public class GridMapQueryExecutor {
         List<GridDhtLocalPartition> reserved = new ArrayList<>();
 
         try {
+            // Topology version can be null in rolling restart with previous 
version!
+            final AffinityTopologyVersion topVer = req.topologyVersion();
+
+            // Await all caches to be deployed on this node and all the needed 
topology changes to arrive.
+            awaitForCacheAffinity(topVer);
+
+            // Unmarshall query params.
             Collection<GridCacheSqlQuery> qrys;
 
             try {
@@ -240,48 +266,39 @@ public class GridMapQueryExecutor {
                 throw new IgniteException(e);
             }
 
-            List<GridCacheContext<?,?>> cctxs = new ArrayList<>();
+            // Reserve primary partitions.
+            if (topVer != null) {
+                for (String cacheName : F.concat(true, req.space(), 
req.extraSpaces())) {
+                    GridCacheContext<?,?> cctx = cacheContext(cacheName, 
topVer);
 
-            for (String cacheName : F.concat(true, req.space(), 
req.extraSpaces())) {
-                GridCacheContext<?,?> cctx = cacheContext(cacheName);
+                    Set<Integer> partIds = 
cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
 
-                if (cctx == null) { // Cache was not deployed yet.
-                    sendRetry(node, req.requestId());
+                    for (int partId : partIds) {
+                        GridDhtLocalPartition part = 
cctx.topology().localPartition(partId, topVer, false);
 
-                    return;
-                }
-                else
-                    cctxs.add(cctx);
-            }
-
-            for (GridCacheContext<?,?> cctx : cctxs) { // Lock primary 
partitions.
-                // TODO how to get all partitions for topology version 
consistently?
-                List<GridDhtLocalPartition> parts = 
cctx.topology().localPartitions();
-                AffinityTopologyVersion affTopVer = 
cctx.topology().topologyVersion();
-
-                if (affTopVer.topologyVersion() != req.topologyVersion()) {
-                    sendRetry(node, req.requestId());
+                        if (part != null) {
+                            // Await for owning state.
+                            part.owningFuture().get();
 
-                    return;
-                }
+                            if (part.reserve()) {
+                                reserved.add(part);
 
-                for (GridDhtLocalPartition part : parts) {
-                    if (!part.primary(affTopVer))
-                        continue;
+                                continue;
+                            }
+                        }
 
-                    if (!part.reserve()) {
+                        // Failed to reserve the partition.
                         sendRetry(node, req.requestId());
 
                         return;
                     }
-
-                    reserved.add(part);
                 }
             }
 
-            GridCacheContext<?,?> cctx = cctxs.get(0); // Main cache context.
+            // Prepare to run queries.
+            GridCacheContext<?,?> mainCctx = cacheContext(req.space(), topVer);
 
-            qr = new QueryResults(req.requestId(), qrys.size(), cctx);
+            qr = new QueryResults(req.requestId(), qrys.size(), mainCctx);
 
             if (nodeRess.put(req.requestId(), qr) != null)
                 throw new IllegalStateException();
@@ -293,10 +310,8 @@ public class GridMapQueryExecutor {
             // Run queries.
             int i = 0;
 
-            String space = req.space();
-
             for (GridCacheSqlQuery qry : qrys) {
-                ResultSet rs = h2.executeSqlQueryWithTimer(space, 
h2.connectionForSpace(space), qry.query(),
+                ResultSet rs = h2.executeSqlQueryWithTimer(req.space(), 
h2.connectionForSpace(req.space()), qry.query(),
                     F.asList(qry.parameters()));
 
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -305,7 +320,7 @@ public class GridMapQueryExecutor {
                         "SQL query executed.",
                         EVT_CACHE_QUERY_EXECUTED,
                         CacheQueryType.SQL.name(),
-                        cctx.namex(),
+                        mainCctx.namex(),
                         null,
                         qry.query(),
                         null,
@@ -348,6 +363,7 @@ public class GridMapQueryExecutor {
         finally {
             h2.setFilters(null);
 
+            // Release reserved partitions.
             for (GridDhtLocalPartition part : reserved)
                 part.release();
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/081e75bc/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3391c97..68c7048 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.*;
@@ -286,20 +287,17 @@ public class GridReduceQueryExecutor {
 
             r.conn = (JdbcConnection)h2.connectionForSpace(space);
 
-            final long topVer = ctx.cluster().get().topologyVersion();
+            AffinityTopologyVersion topVer = 
ctx.discovery().topologyVersionEx();
 
-            // TODO get projection for this topology version.
-            ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
+            Collection<ClusterNode> nodes = 
ctx.discovery().cacheAffinityNodes(space, topVer);
 
             if (cctx.isReplicated() || qry.explain()) {
-                assert qry.explain() || dataNodes.node(ctx.localNodeId()) == 
null : "We must be on a client node.";
+                assert qry.explain() || 
!nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client 
node.";
 
                 // Select random data node to run query on a replicated data 
or get EXPLAIN PLAN from a single node.
-                dataNodes = dataNodes.forRandom();
+                nodes = Collections.singleton(F.rand(nodes));
             }
 
-            final Collection<ClusterNode> nodes = dataNodes.nodes();
-
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
                 GridMergeTable tbl;
 

Reply via email to