#ignite-964: simple query works with pages.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ed7dd30e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ed7dd30e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ed7dd30e Branch: refs/heads/ignite-964 Commit: ed7dd30ed613657e0d2576260aeebdf5c181e7a2 Parents: 1d03ca2 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Jun 26 19:46:37 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Jun 26 19:46:37 2015 +0300 ---------------------------------------------------------------------- .../rest/handlers/query/CacheQueryResult.java | 32 ++--- .../handlers/query/QueryCommandHandler.java | 117 +++++++++++++++++-- .../IgniteScriptingCommandHandler.java | 6 + .../rest/request/RestSqlQueryRequest.java | 17 +++ modules/nodejs/src/main/js/cache.js | 18 ++- modules/nodejs/src/main/js/sql-query.js | 11 +- modules/nodejs/src/test/js/test-query.js | 22 ++-- .../http/jetty/GridJettyRestHandler.java | 12 ++ 8 files changed, 192 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java index ede8a45..3e49576 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java @@ -38,70 +38,57 @@ public class CacheQueryResult implements Externalizable { /** Last flag. */ private boolean last; - /** Node ID. */ - private UUID nodeId; - /** * @return Query ID. */ - public long queryId() { + public long getQueryId() { return qryId; } /** * @param qryId Query ID. */ - public void queryId(long qryId) { + public void setQueryId(long qryId) { this.qryId = qryId; } /** * @return Items. */ - public Collection<?> items() { + public Collection<?> getItems() { return items; } /** * @param items Items. */ - public void items(Collection<?> items) { + public void setItems(Collection<?> items) { this.items = items; } /** * @return Last flag. */ - public boolean last() { + public boolean getLast() { return last; } /** * @param last Last flag. */ - public void last(boolean last) { + public void setLast(boolean last) { this.last = last; } - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @param nodeId Node ID. - */ - public void nodeId(UUID nodeId) { - this.nodeId = nodeId; + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheQueryResult.class, this); } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeBoolean(last); out.writeLong(qryId); - U.writeUuid(out, nodeId); U.writeCollection(out, items); } @@ -109,7 +96,6 @@ public class CacheQueryResult implements Externalizable { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { last = in.readBoolean(); qryId = in.readLong(); - nodeId = U.readUuid(in); items = U.readCollection(in); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index f31e246..82f3726 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.rest.handlers.query; -import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.rest.*; @@ -29,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import javax.cache.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; @@ -37,7 +37,15 @@ import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; */ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Supported commands. */ - private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY); + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY, + FETCH_SQL_QUERY); + + /** Query ID sequence. */ + private static final AtomicLong qryIdGen = new AtomicLong(); + + /** Current queries. */ + private final ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs = + new ConcurrentHashMap<>(); /** * @param ctx Context. @@ -61,7 +69,15 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { case EXECUTE_SQL_QUERY: { assert req instanceof RestSqlQueryRequest : "Invalid type of query request."; - return ctx.closure().callLocalSafe(new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req),false); + return ctx.closure().callLocalSafe( + new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, curs), false); + } + + case FETCH_SQL_QUERY: { + assert req instanceof RestSqlQueryRequest : "Invalid type of query request."; + + return ctx.closure().callLocalSafe( + new FetchQueryCallable(ctx, (RestSqlQueryRequest)req, curs), false); } } @@ -72,19 +88,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { * Execute query callable. */ private static class ExecuteQueryCallable implements Callable<GridRestResponse> { + /** */ + private static final long serialVersionUID = 0L; + /** Kernal context. */ private GridKernalContext ctx; /** Execute query request. */ private RestSqlQueryRequest req; + /** Queries cursors. */ + private ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs; + /** * @param ctx Kernal context. * @param req Execute query request. */ - public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) { + public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, + ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs) { this.ctx = ctx; this.req = req; + this.curs = curs; } /** {@inheritDoc} */ @@ -92,11 +116,90 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { try { SqlQuery<String, String> qry = new SqlQuery(String.class, req.sqlQuery()); - IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName()); + Iterator<Cache.Entry<String, String>> cur = + ctx.grid().cache(req.cacheName()).query(qry).iterator(); + + long qryId = qryIdGen.getAndIncrement(); + + curs.put(qryId, cur); + + List<Cache.Entry<String, String>> res = new ArrayList<>(); + + CacheQueryResult response = new CacheQueryResult(); + + for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i) + res.add(cur.next()); + + response.setItems(res); + + response.setLast(!cur.hasNext()); + + response.setQueryId(qryId); + + if (!cur.hasNext()) + curs.remove(qryId); + + return new GridRestResponse(response); + } + catch (Exception e) { + return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); + } + } + } + + /** + * Fetch query callable. + */ + private static class FetchQueryCallable implements Callable<GridRestResponse> { + /** */ + private static final long serialVersionUID = 0L; + + /** Kernal context. */ + private GridKernalContext ctx; + + /** Execute query request. */ + private RestSqlQueryRequest req; + + /** Queries cursors. */ + private ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs; + + /** + * @param ctx Kernal context. + * @param req Execute query request. + */ + public FetchQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, + ConcurrentHashMap<Long, Iterator<Cache.Entry<String, String>>> curs) { + this.ctx = ctx; + this.req = req; + this.curs = curs; + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + try { + if (curs.contains(req.queryId())) + return new GridRestResponse(GridRestResponse.STATUS_FAILED, + "Cannot find query [qryId=" + req.queryId() + "]"); + + Iterator<Cache.Entry<String, String>> cur = curs.get(req.queryId()); + + List<Cache.Entry<String, String>> res = new ArrayList<>(); + + CacheQueryResult response = new CacheQueryResult(); + + for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i) + res.add(cur.next()); + + response.setItems(res); + + response.setLast(!cur.hasNext()); + + response.setQueryId(req.queryId()); - List<Cache.Entry<String, String>> res = cache.query(qry).getAll(); + if (!cur.hasNext()) + curs.remove(req.queryId()); - return new GridRestResponse(res); + return new GridRestResponse(response); } catch (Exception e) { return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java index 2d65016..d7525a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/scripting/IgniteScriptingCommandHandler.java @@ -260,6 +260,9 @@ public class IgniteScriptingCommandHandler extends GridRestCommandHandlerAdapter * Run script callable. */ private static class RunScriptCallable implements Callable<GridRestResponse> { + /** */ + private static final long serialVersionUID = 0L; + /** Kernal context. */ private GridKernalContext ctx; @@ -291,6 +294,9 @@ public class IgniteScriptingCommandHandler extends GridRestCommandHandlerAdapter * Map reduce callable. */ private static class MapReduceCallable implements Callable<GridRestResponse> { + /** */ + private static final long serialVersionUID = 0L; + /** Kernal context. */ private GridKernalContext ctx; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java index 5731a03..3a0005c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java @@ -33,6 +33,9 @@ public class RestSqlQueryRequest extends GridRestRequest { /** Cache name. */ private String cacheName; + /** Query id. */ + private Long qryId; + /** * @param sqlQry Sql query. */ @@ -88,4 +91,18 @@ public class RestSqlQueryRequest extends GridRestRequest { public String cacheName() { return cacheName; } + + /** + * @param id Query id. + */ + public void queryId(Long id) { + this.qryId = id; + } + + /** + * @return Query id. + */ + public Long queryId() { + return qryId; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/nodejs/src/main/js/cache.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/cache.js b/modules/nodejs/src/main/js/cache.js index cdbd6d4..56a2def 100644 --- a/modules/nodejs/src/main/js/cache.js +++ b/modules/nodejs/src/main/js/cache.js @@ -134,21 +134,33 @@ Cache.prototype.getAll = function(keys, callback) { * @param {SqlQuery} qry Query */ Cache.prototype.query = function(qry) { - function onQuery(qry, error, res) { + function onQueryExecute(qry, error, res) { if (error) { qry.error(error); return; } - qry.end(res); + qry.page(res["items"]); + + if (res["last"]) { + qry.end(); + } + else { + this._server.runCommand("qryfetch", [ + Server.pair("cacheName", this._cacheName), + Server.pair("qryId", res.queryId), + Server.pair("psz", qry.pageSize())], + onQueryExecute.bind(this, qry, res["queryId"])); + } } this._server.runCommand("qryexecute", [ Server.pair("cacheName", this._cacheName), Server.pair("qry", qry.query()), Server.pair("arg", qry.arguments()), - Server.pair("psz", qry.pageSize())], onQuery.bind(null, qry)); + Server.pair("psz", qry.pageSize())], + onQueryExecute.bind(this, qry)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/nodejs/src/main/js/sql-query.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/sql-query.js b/modules/nodejs/src/main/js/sql-query.js index ea3b23b..ba554a3 100644 --- a/modules/nodejs/src/main/js/sql-query.js +++ b/modules/nodejs/src/main/js/sql-query.js @@ -22,8 +22,9 @@ function SqlQuery(sql) { this._sql = sql; this._arg = []; - this._pageSz = 0; + this._pageSz = 1; this._endFunc = function(res) {console.log("Empty end function is called [res=" + res + "]")}; + this._pageFunc = function(res) {console.log("Empty page function is called [res=" + res + "]")} this._errFunc = function(err) {console.log("Empty error function is called [err=" + err + "]")} } @@ -33,6 +34,10 @@ SqlQuery.prototype.on = function(code, f) { this._endFunc = f; break; + case "page": + this._pageFunc = f; + + break; case "error" : this._errFunc = f; @@ -50,6 +55,10 @@ SqlQuery.prototype.error = function(err) { return this._errFunc(err); } +SqlQuery.prototype.page = function(res) { + return this._pageFunc(res); +} + SqlQuery.prototype.query = function() { return this._sql; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/nodejs/src/test/js/test-query.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/test/js/test-query.js b/modules/nodejs/src/test/js/test-query.js index a09c5da..bc0fdc2 100644 --- a/modules/nodejs/src/test/js/test-query.js +++ b/modules/nodejs/src/test/js/test-query.js @@ -28,21 +28,25 @@ testSqlQuery = function() { var qry = new SqlQuery("select * from String"); - qry.on("error", function(err) { - console.log("!!!!!!!!!!!!Error: " + err); + var fullRes = []; + qry.on("error", function(err) { TestUtils.testFails(); }); - qry.on("end", function(res) { - assert(res.length, 1, "Result length is not correct" + - "[expected=1, val = " + res.length + "]"); + qry.on("page", function(res) { + fullRes = fullRes.concat(res); + }); + + qry.on("end", function() { + assert(fullRes.length, 1, "Result length is not correct" + + "[expected=1, val = " + fullRes.length + "]"); - assert(res[0]["key"] === "key0", "Result value for key is not correct "+ - "[expected=key0, real=" + res[0]["key"] + "]"); + assert(fullRes[0]["key"] === "key0", "Result value for key is not correct "+ + "[expected=key0, real=" + fullRes[0]["key"] + "]"); - assert(res[0]["value"] === "val0", "Result value for key is not correct "+ - "[expected=val0, real=" + res[0]["value"] + "]"); + assert(fullRes[0]["value"] === "val0", "Result value for key is not correct "+ + "[expected=val0, real=" + fullRes[0]["value"] + "]"); TestUtils.testDone(); }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed7dd30e/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java ---------------------------------------------------------------------- diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java index c188439..361b713 100644 --- a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java +++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java @@ -484,6 +484,18 @@ public class GridJettyRestHandler extends AbstractHandler { break; } + case FETCH_SQL_QUERY: { + RestSqlQueryRequest restReq0 = new RestSqlQueryRequest(); + + restReq0.queryId(Long.parseLong((String)params.get("qryId"))); + restReq0.pageSize(Integer.parseInt((String)params.get("psz"))); + restReq0.cacheName((String)params.get("cacheName")); + + restReq = restReq0; + + break; + } + default: throw new IgniteCheckedException("Invalid command: " + cmd); }