ignite-484 - v1

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7000722c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7000722c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7000722c

Branch: refs/heads/ignite-484
Commit: 7000722cf550b4333eded7640d965583f2768bdf
Parents: e975b7a
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Tue May 12 08:19:40 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Tue May 12 08:19:40 2015 +0300

----------------------------------------------------------------------
 .../affinity/AffinityTopologyVersion.java       |   7 -
 .../messages/GridQueryNextPageResponse.java     |  39 ++++-
 .../query/h2/twostep/GridMapQueryExecutor.java  | 115 +++++++++++---
 .../h2/twostep/GridReduceQueryExecutor.java     | 156 +++++++++++--------
 4 files changed, 223 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index 77f3359..650c047 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -76,13 +76,6 @@ public class AffinityTopologyVersion implements 
Comparable<AffinityTopologyVersi
     }
 
     /**
-     * @param topVer New topology version.
-     */
-    public void topologyVersion(long topVer) {
-        this.topVer = topVer;
-    }
-
-    /**
      * @return Minor topology version.
      */
     public int minorTopologyVersion() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 4fdc027..c2cca75 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -33,6 +33,12 @@ public class GridQueryNextPageResponse implements Message {
     private static final long serialVersionUID = 0L;
 
     /** */
+    public static final byte CODE_OK = 0;
+
+    /** */
+    public static final byte CODE_RETRY = -1;
+
+    /** */
     private long qryReqId;
 
     /** */
@@ -55,6 +61,9 @@ public class GridQueryNextPageResponse implements Message {
     @GridDirectTransient
     private transient Collection<?> plainRows;
 
+    /** Response code. */
+    private byte code = CODE_OK;
+
     /**
      * For {@link Externalizable}.
      */
@@ -86,6 +95,20 @@ public class GridQueryNextPageResponse implements Message {
     }
 
     /**
+     * @return Response code.
+     */
+    public byte code() {
+        return code;
+    }
+
+    /**
+     * @param code Response code.
+     */
+    public void code(byte code) {
+        this.code = code;
+    }
+
+    /**
      * @return Query request ID.
      */
     public long queryRequestId() {
@@ -186,6 +209,12 @@ public class GridQueryNextPageResponse implements Message {
                     return false;
 
                 writer.incrementState();
+
+            case 6:
+                if (!writer.writeByte("code", code))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -247,6 +276,14 @@ public class GridQueryNextPageResponse implements Message {
 
                 reader.incrementState();
 
+            case 6:
+                code = reader.readByte("code");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return true;
@@ -259,6 +296,6 @@ public class GridQueryNextPageResponse implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index f15a2da..2483912 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -23,7 +23,9 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
@@ -36,6 +38,7 @@ import org.h2.jdbc.*;
 import org.h2.result.*;
 import org.h2.store.*;
 import org.h2.value.*;
+import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import javax.cache.*;
@@ -198,6 +201,16 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param cacheName Cache name.
+     * @return Cache context or {@code null} if none.
+     */
+    @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) {
+        GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName);
+
+        return cache == null ? null : cache.context();
+    }
+
+    /**
      * Executing queries locally.
      *
      * @param node Node.
@@ -206,32 +219,75 @@ public class GridMapQueryExecutor {
     private void onQueryRequest(ClusterNode node, GridQueryRequest req) {
         ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
 
-        Collection<GridCacheSqlQuery> qrys;
+        QueryResults qr = null;
+
+        List<GridDhtLocalPartition> reserved = new ArrayList<>();
 
         try {
-            qrys = req.queries();
+            Collection<GridCacheSqlQuery> qrys;
 
-            if (!node.isLocal()) {
-                Marshaller m = ctx.config().getMarshaller();
+            try {
+                qrys = req.queries();
+
+                if (!node.isLocal()) {
+                    Marshaller m = ctx.config().getMarshaller();
 
-                for (GridCacheSqlQuery qry : qrys)
-                    qry.unmarshallParams(m);
+                    for (GridCacheSqlQuery qry : qrys)
+                        qry.unmarshallParams(m);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
             }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
 
-        GridCacheContext<?,?> cctx = 
ctx.cache().internalCache(req.space()).context();
+            List<GridCacheContext<?,?>> cctxs = new ArrayList<>();
 
-        QueryResults qr = new QueryResults(req.requestId(), qrys.size(), cctx);
+            for (String cacheName : F.concat(true, req.space(), 
req.extraSpaces())) {
+                GridCacheContext<?,?> cctx = cacheContext(cacheName);
 
-        if (nodeRess.put(req.requestId(), qr) != null)
-            throw new IllegalStateException();
+                if (cctx == null) { // Cache was not deployed yet.
+                    sendRetry(node, req.requestId());
 
-        h2.setFilters(h2.backupFilter());
+                    return;
+                }
+                else
+                    cctxs.add(cctx);
+            }
+
+            for (GridCacheContext<?,?> cctx : cctxs) { // Lock primary 
partitions.
+                // TODO how to get all partitions for topology version 
consistently?
+                List<GridDhtLocalPartition> parts = 
cctx.topology().localPartitions();
+                AffinityTopologyVersion affTopVer = 
cctx.topology().topologyVersion();
+
+                if (affTopVer.topologyVersion() != req.topologyVersion()) {
+                    sendRetry(node, req.requestId());
+
+                    return;
+                }
+
+                for (GridDhtLocalPartition part : parts) {
+                    if (!part.primary(affTopVer))
+                        continue;
+
+                    if (!part.reserve()) {
+                        sendRetry(node, req.requestId());
+
+                        return;
+                    }
+
+                    reserved.add(part);
+                }
+            }
+
+            GridCacheContext<?,?> cctx = cctxs.get(0); // Main cache context.
+
+            qr = new QueryResults(req.requestId(), qrys.size(), cctx);
+
+            if (nodeRess.put(req.requestId(), qr) != null)
+                throw new IllegalStateException();
+
+            h2.setFilters(h2.backupFilter());
 
-        try {
             // TODO Prepare snapshots for all the needed tables before the run.
 
             // Run queries.
@@ -276,9 +332,11 @@ public class GridMapQueryExecutor {
             }
         }
         catch (Throwable e) {
-            nodeRess.remove(req.requestId(), qr);
+            if (qr != null) {
+                nodeRess.remove(req.requestId(), qr);
 
-            qr.cancel();
+                qr.cancel();
+            }
 
             U.error(log, "Failed to execute local query: " + req, e);
 
@@ -289,6 +347,9 @@ public class GridMapQueryExecutor {
         }
         finally {
             h2.setFilters(null);
+
+            for (GridDhtLocalPartition part : reserved)
+                part.release();
         }
     }
 
@@ -375,6 +436,24 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param node Node.
+     * @param reqId Request ID.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void sendRetry(ClusterNode node, long reqId) throws 
IgniteCheckedException {
+        boolean loc = node.isLocal();
+
+        GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+            /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
+            loc ? null : Collections.<Message>emptyList(),
+            loc ? Collections.<Value[]>emptyList() : null);
+
+        msg.code(GridQueryNextPageResponse.CODE_RETRY);
+
+        ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, 
GridIoPolicy.PUBLIC_POOL);
+    }
+
+    /**
      * @param bytes Bytes.
      * @return Rows.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 2e69286..3391c97 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -260,6 +260,9 @@ public class GridReduceQueryExecutor {
 
         idx.addPage(page);
 
+        if (msg.code() == GridQueryNextPageResponse.CODE_RETRY)
+            r.retry = true;
+
         if (msg.allRows() != -1) // Only the first page contains row count.
             r.latch.countDown();
     }
@@ -270,109 +273,123 @@ public class GridReduceQueryExecutor {
      * @return Cursor.
      */
     public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, 
GridCacheTwoStepQuery qry) {
-        long qryReqId = reqIdGen.incrementAndGet();
+        for (int attempt = 0;; attempt++) {
+            long qryReqId = reqIdGen.incrementAndGet();
 
-        QueryRun r = new QueryRun();
+            QueryRun r = new QueryRun();
 
-        r.pageSize = qry.pageSize() <= 0 ? 
GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
+            r.pageSize = qry.pageSize() <= 0 ? 
GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
 
-        r.tbls = new ArrayList<>(qry.mapQueries().size());
+            r.tbls = new ArrayList<>(qry.mapQueries().size());
 
-        String space = cctx.name();
+            String space = cctx.name();
 
-        r.conn = (JdbcConnection)h2.connectionForSpace(space);
+            r.conn = (JdbcConnection)h2.connectionForSpace(space);
 
-        // TODO Add topology version.
-        ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
+            final long topVer = ctx.cluster().get().topologyVersion();
 
-        if (cctx.isReplicated() || qry.explain()) {
-            assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null 
: "We must be on a client node.";
+            // TODO get projection for this topology version.
+            ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
 
-            // Select random data node to run query on a replicated data or 
get EXPLAIN PLAN from a single node.
-            dataNodes = dataNodes.forRandom();
-        }
+            if (cctx.isReplicated() || qry.explain()) {
+                assert qry.explain() || dataNodes.node(ctx.localNodeId()) == 
null : "We must be on a client node.";
 
-        final Collection<ClusterNode> nodes = dataNodes.nodes();
+                // Select random data node to run query on a replicated data 
or get EXPLAIN PLAN from a single node.
+                dataNodes = dataNodes.forRandom();
+            }
 
-        for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
-            GridMergeTable tbl;
+            final Collection<ClusterNode> nodes = dataNodes.nodes();
 
-            try {
-                tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // 
createTable(r.conn, mapQry); TODO
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
+            for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+                GridMergeTable tbl;
 
-            GridMergeIndex idx = tbl.getScanIndex(null);
+                try {
+                    tbl = createFunctionTable(r.conn, mapQry, qry.explain()); 
// createTable(r.conn, mapQry); TODO
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
 
-            for (ClusterNode node : nodes)
-                idx.addSource(node.id());
+                GridMergeIndex idx = tbl.getScanIndex(null);
 
-            r.tbls.add(tbl);
+                for (ClusterNode node : nodes)
+                    idx.addSource(node.id());
 
-            curFunTbl.set(tbl);
-        }
+                r.tbls.add(tbl);
 
-        r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+                curFunTbl.set(tbl);
+            }
 
-        runs.put(qryReqId, r);
+            r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
 
-        try {
-            Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
+            runs.put(qryReqId, r);
 
-            if (qry.explain()) {
-                mapQrys = new ArrayList<>(qry.mapQueries().size());
+            try {
+                Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
 
-                for (GridCacheSqlQuery mapQry : qry.mapQueries())
-                    mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN 
" + mapQry.query(), mapQry.parameters()));
-            }
+                if (qry.explain()) {
+                    mapQrys = new ArrayList<>(qry.mapQueries().size());
 
-            if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall 
params for remotes.
-                Marshaller m = ctx.config().getMarshaller();
+                    for (GridCacheSqlQuery mapQry : qry.mapQueries())
+                        mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), 
"EXPLAIN " + mapQry.query(), mapQry.parameters()));
+                }
 
-                for (GridCacheSqlQuery mapQry : mapQrys)
-                    mapQry.marshallParams(m);
-            }
+                if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // 
Marshall params for remotes.
+                    Marshaller m = ctx.config().getMarshaller();
 
-            send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, 
mapQrys,
-                ctx.cluster().get().topologyVersion(),
-                extraSpaces(space, qry.spaces())));
+                    for (GridCacheSqlQuery mapQry : mapQrys)
+                        mapQry.marshallParams(m);
+                }
+
+                send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, 
mapQrys, topVer,
+                    extraSpaces(space, qry.spaces())));
 
-            r.latch.await();
+                U.await(r.latch);
 
-            if (r.rmtErr != null)
-                throw new CacheException("Failed to run map query remotely.", 
r.rmtErr);
+                if (r.rmtErr != null)
+                    throw new CacheException("Failed to run map query 
remotely.", r.rmtErr);
 
-            if (qry.explain())
-                return explainPlan(r.conn, space, qry);
+                ResultSet res = null;
 
-            GridCacheSqlQuery rdc = qry.reduceQuery();
+                if (!r.retry) {
+                    if (qry.explain())
+                        return explainPlan(r.conn, space, qry);
 
-            final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, 
rdc.query(), F.asList(rdc.parameters()));
+                    GridCacheSqlQuery rdc = qry.reduceQuery();
+
+                    res = h2.executeSqlQueryWithTimer(space, r.conn, 
rdc.query(), F.asList(rdc.parameters()));
+                }
 
-            for (GridMergeTable tbl : r.tbls) {
-                if (!tbl.getScanIndex(null).fetchedAll()) // We have to 
explicitly cancel queries on remote nodes.
-                    send(nodes, new GridQueryCancelRequest(qryReqId));
+                for (GridMergeTable tbl : r.tbls) {
+                    if (!tbl.getScanIndex(null).fetchedAll()) // We have to 
explicitly cancel queries on remote nodes.
+                        send(nodes, new GridQueryCancelRequest(qryReqId));
 
 //                dropTable(r.conn, tbl.getName()); TODO
-            }
+                }
 
-            return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new 
Iter(res), cctx, cctx.keepPortable()));
-        }
-        catch (IgniteCheckedException | InterruptedException | 
RuntimeException e) {
-            U.closeQuiet(r.conn);
+                if (r.retry) {
+                    if (attempt > 0)
+                        U.sleep(attempt * 10);
+
+                    continue;
+                }
+
+                return new QueryCursorImpl<>(new 
GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()));
+            }
+            catch (IgniteCheckedException | RuntimeException e) {
+                U.closeQuiet(r.conn);
 
-            if (e instanceof CacheException)
-                throw (CacheException)e;
+                if (e instanceof CacheException)
+                    throw (CacheException)e;
 
-            throw new CacheException("Failed to run reduce query locally.", e);
-        }
-        finally {
-            if (!runs.remove(qryReqId, r))
-                U.warn(log, "Query run was already removed: " + qryReqId);
+                throw new CacheException("Failed to run reduce query 
locally.", e);
+            }
+            finally {
+                if (!runs.remove(qryReqId, r))
+                    U.warn(log, "Query run was already removed: " + qryReqId);
 
-            curFunTbl.remove();
+                curFunTbl.remove();
+            }
         }
     }
 
@@ -680,6 +697,9 @@ public class GridReduceQueryExecutor {
 
         /** */
         private volatile CacheException rmtErr;
+
+        /** */
+        private volatile boolean retry;
     }
 
     /**

Reply via email to