#ignite-961: add promises
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8616eebb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8616eebb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8616eebb Branch: refs/heads/ignite-1121 Commit: 8616eebbfd8b89ed260cc86af9a7748a9a1cd3fd Parents: 4f8810c Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Jul 16 12:25:37 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Jul 16 12:25:37 2015 +0300 ---------------------------------------------------------------------- examples/src/main/js/cache-api-example.js | 58 +- examples/src/main/js/cache-put-get-example.js | 75 +- examples/src/main/js/cache-query-example.js | 73 +- .../main/js/cache-sql-fields-query-example.js | 64 +- examples/src/main/js/compute-run-example.js | 38 +- examples/src/main/js/map-reduce-example.js | 23 +- .../processors/rest/GridRestCommand.java | 5 +- .../handlers/query/QueryCommandHandler.java | 110 ++- modules/nodejs/src/main/js/cache.js | 371 ++++++--- modules/nodejs/src/main/js/compute.js | 36 +- modules/nodejs/src/main/js/ignite.js | 107 +-- modules/nodejs/src/main/js/ignition.js | 106 ++- modules/nodejs/src/main/js/package.json | 5 +- modules/nodejs/src/main/js/server.js | 2 + .../ignite/internal/NodeJsIgnitionSelfTest.java | 12 +- .../ignite/internal/NodeJsSqlQuerySelfTest.java | 18 + modules/nodejs/src/test/js/test-cache-api.js | 799 ++++++++++-------- modules/nodejs/src/test/js/test-compute.js | 814 +++++++++---------- modules/nodejs/src/test/js/test-ignite.js | 104 +-- modules/nodejs/src/test/js/test-ignition.js | 78 +- modules/nodejs/src/test/js/test-key.js | 56 +- modules/nodejs/src/test/js/test-query.js | 169 ++-- modules/nodejs/src/test/js/test-utils.js | 11 +- .../http/jetty/GridJettyRestHandler.java | 11 + 24 files changed, 1760 insertions(+), 1385 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/cache-api-example.js ---------------------------------------------------------------------- diff --git a/examples/src/main/js/cache-api-example.js b/examples/src/main/js/cache-api-example.js index f708160..1d4ddda 100644 --- a/examples/src/main/js/cache-api-example.js +++ b/examples/src/main/js/cache-api-example.js @@ -31,69 +31,57 @@ function main() { var cacheName = "ApiExampleCache"; /** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */ - Ignition.start(['127.0.0.1:8000..9000'], null, onConnect); - - function onConnect(err, ignite) { - if (err !== null) - throw "Start remote node with config examples/config/example-ignite.xml."; - + Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) { console.log(">>> Cache API example started."); // Create cache on server with cacheName. - ignite.getOrCreateCache(cacheName, function(err, cache) { + ignite.getOrCreateCache(cacheName).then(function(cache){ atomicMapOperations(ignite, cache); }); - } + }).catch(function(err) { + if (err !== null) + console.log("Start remote node with config examples/config/example-ignite.xml."); + }); /** * Demonstrates cache operations similar to {@link ConcurrentMap} API. Note that * cache API is a lot richer than the JDK {@link ConcurrentMap}. */ - atomicMapOperations = function(ignite, cache) { + function atomicMapOperations (ignite, cache) { console.log(">>> Cache atomic map operation examples."); - cache.removeAllFromCache(function(err) { + cache.removeAllFromCache().then(function(){ // Put and return previous value. - cache.getAndPut(1, "1", onGetAndPut) - }); - - function onGetAndPut(err, entry) { + return cache.getAndPut(1, "1"); + }).then(function(entry){ console.log(">>> Get and put finished [result=" + entry + "]"); // Put and do not return previous value. // Performs better when previous value is not needed. - cache.put(2, "2", onPut); - } - - onPut = function(err) { + return cache.put(2, "2") + }).then(function(){ console.log(">>> Put finished."); // Put-if-absent. - cache.putIfAbsent(4, "44", onPutIfAbsent); - } - - onPutIfAbsent = function(err, res) { + return cache.putIfAbsent(4, "44"); + }).then(function(res){ console.log(">>> Put if absent finished [result=" + res + "]"); // Replace. - cache.replaceValue(4, "55", "44", onReplaceValue); - } - - onReplaceValue = function(err, res) { + return cache.replaceValue(4, "55", "44"); + }).then(function(res) { console.log(">>> Replace value finished [result=" + res + "]"); // Replace not correct value. - cache.replaceValue(4, "555", "44", onEnd); - } - - onEnd = function(err) { + return cache.replaceValue(4, "555", "44"); + }).then(function(res) { console.log(">>> Replace finished."); - // Destroying cache. - ignite.destroyCache(cacheName, function(err) { - console.log(">>> End of Cache API example."); - }); - } + //Destroying cache. + return ignite.destroyCache(cacheName); + }).then(function(){ + console.log(">>> End of Cache API example."); + }) } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/cache-put-get-example.js ---------------------------------------------------------------------- diff --git a/examples/src/main/js/cache-put-get-example.js b/examples/src/main/js/cache-put-get-example.js index e12a54d..69478e9 100644 --- a/examples/src/main/js/cache-put-get-example.js +++ b/examples/src/main/js/cache-put-get-example.js @@ -32,48 +32,29 @@ function main() { var cacheName = "PutGetExampleCache"; /** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */ - Ignition.start(['127.0.0.1:8000..9000'], null, onConnect); + Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) { + console.log(">>> Cache put-get example started."); - function onConnect(err, ignite) { + // Create cache on server with cacheName. + ignite.getOrCreateCache(cacheName).then(function(cache){ + putGetExample(ignite, cache); + }); + }).catch(function(err) { if (err !== null) - throw "Start remote node with config examples/config/example-ignite.xml."; - - ignite.getOrCreateCache(cacheName, function(err, cache) { putGetExample(ignite, cache); }); - } - - /** Execute individual puts and gets. */ - putGetExample = function(ignite, cache) { - console.log(">>> Cache put-get example started."); + console.log("Start remote node with config examples/config/example-ignite.xml."); + }); + /** Execute puts and gets. */ + function putGetExample(ignite, cache) { var key = 1; - // Store key in cache. - cache.put(key, "1", onPut); - - function onPut(err) { - console.log(">>> Stored values in cache."); - - cache.get(key, onGet); - } - - function onGet(err, res) { - console.log("Get value=" + res); - - putAllGetAll(ignite, cache); - } - } - - /** Execute bulk {@code putAll(...)} and {@code getAll(...)} operations. */ - function putAllGetAll(ignite, cache) { - console.log(">>> Starting putAll-getAll example."); - var keyCnt = 20; // Create batch. var batch = []; var keys = []; - for (var i = keyCnt; i < keyCnt + keyCnt; ++i) { + for (var i = 0; i < keyCnt; ++i) { var key = i; var val = "bulk-" + i; @@ -81,24 +62,34 @@ function main() { batch.push(new CacheEntry(key, val)); } - // Bulk-store entries in cache. - cache.putAll(batch, onPutAll); - - function onPutAll(err) { + // Store key in cache. + cache.put(key, "1").then(function(){ console.log(">>> Stored values in cache."); - // Bulk-get values from cache. - cache.getAll(keys, onGetAll); - } + // Get value. + return cache.get(key); + }).then(function(entry){ + console.log(">>> Get finished [result=" + entry + "]"); + + console.log(">>> Starting putAll-getAll example."); - function onGetAll(err, entries) { + // Bulk-store entries in cache. + return cache.putAll(batch); + }).then(function(){ + console.log(">>> Stored values in cache."); + + // GetAll keys. + return cache.getAll(keys); + }).then(function(entries){ for (var e of entries) { - console.log("Got entry [key=" + e.key + ", value=" + e.value + ']'); + console.log(">>> Got entry [key=" + e.key + ", value=" + e.value + ']'); } // Destroying cache. - ignite.destroyCache(cacheName, function(err) { console.log(">>> End of cache put-get example."); }); - } + return ignite.destroyCache(cacheName); + }).then(function(){ + console.log(">>> End of cache put-get example.") + }) } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/cache-query-example.js ---------------------------------------------------------------------- diff --git a/examples/src/main/js/cache-query-example.js b/examples/src/main/js/cache-query-example.js index 1c6b980..8b455b5 100644 --- a/examples/src/main/js/cache-query-example.js +++ b/examples/src/main/js/cache-query-example.js @@ -36,26 +36,25 @@ main = function() { var cacheName = "CacheQueryExample"; /** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */ - Ignition.start(['127.0.0.1:8000..9000'], null, onConnect); - - function onConnect(err, ignite) { - if (err !== null) - throw "Start remote node with config examples/config/example-ignite.xml."; - + Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) { console.log(">>> Cache query example started."); - var entries = initializeEntries(); - - ignite.getOrCreateCache(cacheName, function(err, cache) { - cacheQuery(ignite, cache, entries); + // Create cache on server with cacheName. + ignite.getOrCreateCache(cacheName).then(function(cache){ + cacheQuery(ignite, cache); }); - } + }).catch(function(err) { + if (err !== null) + console.log("Start remote node with config examples/config/example-ignite.xml."); + }); - function cacheQuery(ignite, cache, entries) { - cache.putAll(entries, onCachePut); + // Run query example. + function cacheQuery(ignite, cache) { + var entries = initializeEntries(); - function onCachePut(err) { - console.log(">>> Create cache for people.") + // Initialize cache. + cache.putAll(entries).then(function(){ + console.log(">>> Create cache for people."); //SQL clause which selects salaries based on range. var qry = new SqlQuery("salary > ? and salary <= ?"); @@ -69,27 +68,37 @@ main = function() { var fullRes = []; - //This function is called when we get part of query result. - qry.on("page", function(res) { - console.log(">>> Get result on page: " + JSON.stringify(res)); + // Get query cursor. + var cursor = ignite.cache(cacheName).query(qry); - fullRes = fullRes.concat(res); - }); + function onQuery(cursor) { + var page = cursor.page(); - //This function is called when query is finished. - qry.on("end", function(err) { - console.log(">>> People with salaries between 0 and 2000 (queried with SQL query): " + - JSON.stringify(fullRes)); + console.log(">>> Get result on page: " + JSON.stringify(page)); - // Destroying cache. - ignite.destroyCache(cacheName, function(err) { - console.log(">>> End of query example."); - }); - }); + //Concat query page results. + fullRes.concat(page); + + // IsFinished return true if it is the last page. + if (cursor.isFinished()) { + console.log(">>> People with salaries between 0 and 2000 (queried with SQL query): " + + JSON.stringify(fullRes)); + + //Destroying cache on the end of the example. + return ignite.destroyCache(cacheName); + } - //Run query. - ignite.cache(cacheName).query(qry); - } + //Get Promise for next page. + var nextPromise = cursor.nextPage(); + + return nextPromise.then(onQuery); + } + + // Get query's page. + return cursor.nextPage().then(onQuery).then(function(){ + console.log(">>> End of sql query example."); + }); + }) } // Initialize cache for people. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/cache-sql-fields-query-example.js ---------------------------------------------------------------------- diff --git a/examples/src/main/js/cache-sql-fields-query-example.js b/examples/src/main/js/cache-sql-fields-query-example.js index 58eb26b..7290581 100644 --- a/examples/src/main/js/cache-sql-fields-query-example.js +++ b/examples/src/main/js/cache-sql-fields-query-example.js @@ -22,6 +22,7 @@ var SqlQuery = apacheIgnite.SqlQuery; var SqlFieldsQuery = apacheIgnite.SqlFieldsQuery; var CacheEntry = apacheIgnite.CacheEntry; + /** * Cache queries example. This example demonstrates SQL queries over cache. * <p> @@ -32,24 +33,71 @@ var CacheEntry = apacheIgnite.CacheEntry; */ main = function() { /** Cache name. */ - var cacheName = "CacheSqlFieldsQueryExample"; + var cacheName = "CacheQueryExample"; /** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */ - Ignition.start(['127.0.0.1:8000..9000'], null, onConnect); + Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) { + console.log(">>> Cache sql fields query example started."); - function onConnect(err, ignite) { + // Create cache on server with cacheName. + ignite.getOrCreateCache(cacheName).then(function(cache){ + cacheSqlFieldsQuery(ignite, cache); + }); + }).catch(function(err) { if (err !== null) - throw "Start remote node with config examples/config/example-ignite.xml."; - - console.log(">>> Cache sql fields query example started."); + console.log("Start remote node with config examples/config/example-ignite.xml."); + }); + // Run query example. + function cacheSqlFieldsQuery(ignite, cache) { var entries = initializeEntries(); - ignite.getOrCreateCache(cacheName, function(err, cache) { - cacheSqlFieldsQuery(ignite, cache, entries); + // Initialize cache. + cache.putAll(entries).then(function(){ + console.log(">>> Create cache for people."); + + //Sql query to get names of all employees. + var qry = new SqlFieldsQuery("select concat(firstName, ' ', lastName) from Person"); + + // Set page size for query. + qry.setPageSize(2); + + //Set salary range. + qry.setArguments([0, 2000]); + + // Run query. + ignite.cache(cacheName).query(qry).getAll(function(fullRes){ + console.log(">>> Names of all employees: " + JSON.stringify(fullRes)); + + // Destroying cache on the end of the example. + return ignite.destroyCache(cacheName); + }).then(function(){ + console.log(">>> End of sql fields query example."); + }); }); } + // Initialize cache for people. + function initializeEntries() { + var key1 = "1"; + var value1 = {"firstName" : "John", "lastName" : "Doe", "salary" : 2000}; + var key2 = "2"; + var value2 = {"firstName" : "Jane", "lastName" : "Doe", "salary" : 1000}; + var key3 = "3"; + var value3 = {"firstName" : "John", "lastName" : "Smith", "salary" : 1000}; + var key4 = "4"; + var value4 = {"firstName" : "Jane", "lastName" : "Smith", "salary" : 2000}; + var key5 = "5"; + var value5 = {"firstName" : "Ann", "lastName" : "Smith", "salary" : 3000}; + + return [new CacheEntry(key1, value1), new CacheEntry(key2, value2), + new CacheEntry(key3, value3), new CacheEntry(key4, value4)]; + } +} + +main(); + + function cacheSqlFieldsQuery(ignite, cache, entries) { cache.putAll(entries, onCachePut.bind(null, ignite)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/compute-run-example.js ---------------------------------------------------------------------- diff --git a/examples/src/main/js/compute-run-example.js b/examples/src/main/js/compute-run-example.js index 189c63f..9d2a9ad 100644 --- a/examples/src/main/js/compute-run-example.js +++ b/examples/src/main/js/compute-run-example.js @@ -31,25 +31,24 @@ function main() { var cacheName = "RunCacheScriptCache"; /** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */ - Ignition.start(['127.0.0.1:8000..9000'], null, onConnect); - - function onConnect(err, ignite) { - if (err !== null) - throw "Start remote node with config examples/config/example-ignite.xml."; - + Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) { console.log(">>> Run cache script example started."); - ignite.getOrCreateCache(cacheName, function(err, cache) { runCacheScript(ignite, cache); }); - } + // Create cache on server with cacheName. + ignite.getOrCreateCache(cacheName).then(function(cache){ + runCacheScript(ignite, cache); + }); + }).catch(function(err) { + if (err !== null) + console.log("Start remote node with config examples/config/example-ignite.xml."); + }); function runCacheScript(ignite, cache) { var key = "John"; var person = {"firstName": "John", "lastName": "Doe", "salary" : 2000}; // Store person in the cache - cache.put(key, person, onPut); - - function onPut(err) { + cache.put(key, person).then(function(){ var job = function (args) { print(">>> Hello node: " + ignite.name()); @@ -65,16 +64,15 @@ function main() { return val.salary; } - var onRun = function(err, salary) { - console.log(">>> " + key + "'s salary is " + salary); - - // Destroying cache. - ignite.destroyCache(cacheName, function(err) { console.log(">>> End of run cache script example."); }); - } - /** Run remote job on server ignite node with arguments [cacheName, key]. */ - ignite.compute().run(job, [cacheName, key], onRun); - } + return ignite.compute().run(job, [cacheName, key]); + }).then(function(salary){ + console.log(">>> " + key + "'s salary is " + salary); + + return ignite.destroyCache(cacheName); + }).then(function() { + console.log(">>> End of run cache script example."); + }); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/examples/src/main/js/map-reduce-example.js ---------------------------------------------------------------------- diff --git a/examples/src/main/js/map-reduce-example.js b/examples/src/main/js/map-reduce-example.js index db2da87..326fa3c 100644 --- a/examples/src/main/js/map-reduce-example.js +++ b/examples/src/main/js/map-reduce-example.js @@ -32,12 +32,7 @@ var Ignition = apacheIgnite.Ignition; */ function main() { /** Connect to node that started with {@code examples/config/js/example-js-cache.xml} configuration. */ - Ignition.start(['127.0.0.1:8000..9000'], null, onConnect); - - function onConnect(err, ignite) { - if (err !== null) - throw "Start remote node with config examples/config/example-ignite.xml."; - + Ignition.start(['127.0.0.1:8000..9000'], null).then(function(ignite) { console.log(">>> Compute map reduce example started."); /** @@ -72,14 +67,14 @@ function main() { return sum; } - // Called when map reduced finished. - var onMapReduce = function(err, cnt) { - console.log(">>> Total number of characters in the phrase is '" + cnt + "'."); - console.log(">>> End of compute map reduce example."); - } - - ignite.compute().mapReduce(map, reduce, "Hello Ignite Enabled World!", onMapReduce); - } + return ignite.compute().mapReduce(map, reduce, "Hello Ignite World!"); + }).then(function(cnt){ + console.log(">>> Total number of characters in the phrase is '" + cnt + "'."); + console.log(">>> End of compute map reduce example."); + }).catch(function(err) { + if (err !== null) + console.log("Start remote node with config examples/config/example-ignite.xml. "); + }); } main(); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java index 45e86e0..f5c2546 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java @@ -151,7 +151,10 @@ public enum GridRestCommand { EXECUTE_SQL_FIELDS_QUERY("qryfieldsexecute"), /** Fetch query results. */ - FETCH_SQL_QUERY("qryfetch"); + FETCH_SQL_QUERY("qryfetch"), + + /** Close query. */ + CLOSE_SQL_QUERY("qryclose"); /** Enum values. */ private static final GridRestCommand[] VALS = values(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index ec9575c..e2118b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.rest.handlers.query; +import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.rest.*; @@ -38,7 +39,8 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Supported commands. */ private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY, EXECUTE_SQL_FIELDS_QUERY, - FETCH_SQL_QUERY); + FETCH_SQL_QUERY, + CLOSE_SQL_QUERY); /** Query ID sequence. */ private static final AtomicLong qryIdGen = new AtomicLong(); @@ -46,6 +48,9 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Current queries. */ private final ConcurrentHashMap<Long, Iterator> curs = new ConcurrentHashMap<>(); + /** Current queries cursors. */ + private final ConcurrentHashMap<Long, QueryCursor> qryCurs = new ConcurrentHashMap<>(); + /** * @param ctx Context. */ @@ -63,21 +68,23 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { assert req != null; assert SUPPORTED_COMMANDS.contains(req.command()); + assert req instanceof RestSqlQueryRequest : "Invalid type of query request."; switch (req.command()) { case EXECUTE_SQL_QUERY: case EXECUTE_SQL_FIELDS_QUERY: { - assert req instanceof RestSqlQueryRequest : "Invalid type of query request."; - return ctx.closure().callLocalSafe( - new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, curs), false); + new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, curs, qryCurs), false); } case FETCH_SQL_QUERY: { - assert req instanceof RestSqlQueryRequest : "Invalid type of query request."; + return ctx.closure().callLocalSafe( + new FetchQueryCallable((RestSqlQueryRequest)req, curs, qryCurs), false); + } + case CLOSE_SQL_QUERY: { return ctx.closure().callLocalSafe( - new FetchQueryCallable((RestSqlQueryRequest)req, curs), false); + new CloseQueryCallable((RestSqlQueryRequest)req, curs, qryCurs), false); } } @@ -94,19 +101,23 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ + /** Queries iterators. */ private ConcurrentHashMap<Long, Iterator> curs; + /** Queries cursors. */ + private ConcurrentHashMap<Long, QueryCursor> qryCurs; + /** * @param ctx Kernal context. * @param req Execute query request. * @param curs Queries cursors. */ public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, - ConcurrentHashMap<Long, Iterator> curs) { + ConcurrentHashMap<Long, Iterator> curs, ConcurrentHashMap<Long, QueryCursor> qryCurs) { this.ctx = ctx; this.req = req; this.curs = curs; + this.qryCurs = qryCurs; } /** {@inheritDoc} */ @@ -125,13 +136,22 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { ((SqlFieldsQuery)qry).setArgs(req.arguments()); } - Iterator cur = ctx.grid().cache(req.cacheName()).query(qry).iterator(); + IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName()); + + if (cache == null) + return new GridRestResponse(GridRestResponse.STATUS_FAILED, + "No cache with name. [cacheName=" + req.cacheName() + "]"); + + QueryCursor qryCur = cache.query(qry); + + Iterator cur = qryCur.iterator(); long qryId = qryIdGen.getAndIncrement(); + qryCurs.put(qryId, qryCur); curs.put(qryId, cur); - CacheQueryResult res = createQueryResult(curs, cur, req, qryId); + CacheQueryResult res = createQueryResult(qryCurs, curs, cur, req, qryId); return new GridRestResponse(res); } @@ -142,23 +162,74 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { } /** + * Close query callable. + */ + private static class CloseQueryCallable implements Callable<GridRestResponse> { + /** Execute query request. */ + private RestSqlQueryRequest req; + + /** Queries iterators. */ + private ConcurrentHashMap<Long, Iterator> curs; + + /** Queries cursors. */ + private ConcurrentHashMap<Long, QueryCursor> qryCurs; + + /** + * @param req Execute query request. + * @param curs Queries cursors. + */ + public CloseQueryCallable(RestSqlQueryRequest req, + ConcurrentHashMap<Long, Iterator> curs, + ConcurrentHashMap<Long, QueryCursor> qryCurs) { + this.req = req; + this.curs = curs; + this.qryCurs = qryCurs; + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + try { + QueryCursor cur = qryCurs.get(req.queryId()); + + if (cur == null) + return new GridRestResponse(GridRestResponse.STATUS_FAILED, + "Cannot find query [qryId=" + req.queryId() + "]"); + + cur.close(); + + return new GridRestResponse(true); + } + catch (Exception e) { + qryCurs.remove(req.queryId()); + curs.remove(req.queryId()); + + return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); + } + } + } + + /** * Fetch query callable. */ private static class FetchQueryCallable implements Callable<GridRestResponse> { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ + /** Queries iterators. */ private ConcurrentHashMap<Long, Iterator> curs; + /** Queries cursors. */ + private ConcurrentHashMap<Long, QueryCursor> qryCurs; + /** * @param req Execute query request. * @param curs Queries cursors. */ - public FetchQueryCallable(RestSqlQueryRequest req, - ConcurrentHashMap<Long, Iterator> curs) { + public FetchQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap<Long, Iterator> curs, + ConcurrentHashMap<Long, QueryCursor> qryCurs) { this.req = req; this.curs = curs; + this.qryCurs = qryCurs; } /** {@inheritDoc} */ @@ -170,12 +241,13 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Cannot find query [qryId=" + req.queryId() + "]"); - CacheQueryResult res = createQueryResult(curs, cur, req, req.queryId()); + CacheQueryResult res = createQueryResult(qryCurs, curs, cur, req, req.queryId()); return new GridRestResponse(res); } catch (Exception e) { curs.remove(req.queryId()); + qryCurs.remove(req.queryId()); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -183,13 +255,15 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { } /** - * @param curs Queries cursors. + * @param qryCurs Query cursors. + * @param curs Queries iterators. * @param cur Current cursor. * @param req Sql request. * @param qryId Query id. * @return Query result with items. */ - private static CacheQueryResult createQueryResult(ConcurrentHashMap<Long, Iterator> curs, Iterator cur, + private static CacheQueryResult createQueryResult(ConcurrentHashMap<Long, QueryCursor> qryCurs, + ConcurrentHashMap<Long, Iterator> curs, Iterator cur, RestSqlQueryRequest req, Long qryId) { CacheQueryResult res = new CacheQueryResult(); @@ -204,8 +278,10 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { res.setQueryId(qryId); - if (!cur.hasNext()) + if (!cur.hasNext()) { + qryCurs.remove(qryId); curs.remove(qryId); + } return res; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/cache.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/cache.js b/modules/nodejs/src/main/js/cache.js index 893a945..6cae632 100644 --- a/modules/nodejs/src/main/js/cache.js +++ b/modules/nodejs/src/main/js/cache.js @@ -15,6 +15,7 @@ * limitations under the License. */ +var Promise = require("bluebird"); var Server = require("./server").Server; var Command = require("./server").Command; var SqlFieldsQuery = require("./sql-fields-query").SqlFieldsQuery @@ -48,12 +49,10 @@ Cache.prototype.name = function() { * * @this {Cache} * @param {string} key Key - * @param {onGet} callback Called on finish */ -Cache.prototype.get = function(key, callback) { - this._server.runCommand(this._createCommand("get"). - setPostData(JSON.stringify({"key": key})), - callback); +Cache.prototype.get = function(key) { + return this.__createPromise(this._createCommand("get"). + setPostData(JSON.stringify({"key": key}))); }; /** @@ -62,12 +61,10 @@ Cache.prototype.get = function(key, callback) { * @this {Cache} * @param {string} key Key * @param {string} value Value - * @param {noValue} callback Called on finish */ -Cache.prototype.put = function(key, value, callback) { - this._server.runCommand(this._createCommand("put"). - setPostData(JSON.stringify({"key": key, "val" : value})), - callback); +Cache.prototype.put = function(key, value) { + return this.__createPromise(this._createCommand("put"). + setPostData(JSON.stringify({"key": key, "val" : value}))); } /** @@ -76,12 +73,10 @@ Cache.prototype.put = function(key, value, callback) { * @this {Cache} * @param {string} key Key * @param {string} value Value - * @param {onGet} callback Called on finish */ -Cache.prototype.putIfAbsent = function(key, value, callback) { - this._server.runCommand(this._createCommand("putifabsent"). - setPostData(JSON.stringify({"key": key, "val" : value})), - callback); +Cache.prototype.putIfAbsent = function(key, value) { + return this.__createPromise(this._createCommand("putifabsent"). + setPostData(JSON.stringify({"key": key, "val" : value}))); } /** @@ -89,12 +84,10 @@ Cache.prototype.putIfAbsent = function(key, value, callback) { * * @this {Cache} * @param key Key - * @param {noValue} callback Called on finish */ Cache.prototype.remove = function(key, callback) { - this._server.runCommand(this._createCommand("rmv"). - setPostData(JSON.stringify({"key": key})), - callback); + return this.__createPromise(this._createCommand("rmv"). + setPostData(JSON.stringify({"key": key}))); } /** @@ -103,12 +96,10 @@ Cache.prototype.remove = function(key, callback) { * @this {Cache} * @param key Key * @param value Value - * @param {noValue} callback Called on finish */ Cache.prototype.removeValue = function(key, value, callback) { - this._server.runCommand(this._createCommand("rmvvalue"). - setPostData(JSON.stringify({"key": key, "val" : value})), - callback); + return this.__createPromise(this._createCommand("rmvvalue"). + setPostData(JSON.stringify({"key": key, "val" : value}))); } /** @@ -116,12 +107,10 @@ Cache.prototype.removeValue = function(key, value, callback) { * * @this {Cache} * @param {string} key Key - * @param {onGet} callback Called on finish with previous value */ Cache.prototype.getAndRemove = function(key, callback) { - this._server.runCommand(this._createCommand("getandrmv"). - setPostData(JSON.stringify({"key": key})), - callback); + return this.__createPromise(this._createCommand("getandrmv"). + setPostData(JSON.stringify({"key": key}))); } /** @@ -129,23 +118,19 @@ Cache.prototype.getAndRemove = function(key, callback) { * * @this {Cache} * @param {string[]} keys Keys to remove - * @param {noValue} callback Called on finish */ Cache.prototype.removeAll = function(keys, callback) { - this._server.runCommand(this._createCommand("rmvall"). - setPostData(JSON.stringify({"keys" : keys})), - callback); + return this.__createPromise(this._createCommand("rmvall"). + setPostData(JSON.stringify({"keys" : keys}))); } /** * Remove all cache keys * * @this {Cache} - * @param {noValue} callback Called on finish */ Cache.prototype.removeAllFromCache = function(callback) { - this._server.runCommand(this._createCommand("rmvall"), - callback); + return this.__createPromise(this._createCommand("rmvall")); } /** @@ -153,11 +138,10 @@ Cache.prototype.removeAllFromCache = function(callback) { * * @this {Cache} * @param {CacheEntry[]} List of entries to put in the cache - * @param {noValue} callback Called on finish */ -Cache.prototype.putAll = function(entries, callback) { - this._server.runCommand(this._createCommand("putall").setPostData( - JSON.stringify({"entries" : entries})), callback); +Cache.prototype.putAll = function(entries) { + return this.__createPromise(this._createCommand("putall").setPostData( + JSON.stringify({"entries" : entries}))); } /** @@ -165,28 +149,27 @@ Cache.prototype.putAll = function(entries, callback) { * * @this {Cache} * @param {Object[]} keys Keys - * @param {Cache~onGetAll} callback Called on finish */ Cache.prototype.getAll = function(keys, callback) { - function onGetAll(callback, err, res) { - if (err) { - callback.call(null, err, null); - - return; - } - - var result = []; - - for (var key of res) { - result.push(new CacheEntry(key["key"], key["value"])); - } - - callback.call(null, null, result); - } - - this._server.runCommand(this._createCommand("getall").setPostData( - JSON.stringify({"keys" : keys})), - onGetAll.bind(null, callback)); + var cmd = this._createCommand("getall").setPostData(JSON.stringify({"keys" : keys})); + + var server = this._server; + return new Promise(function(resolve, reject) { + server.runCommand(cmd, function(err, res) { + if(err != null) { + reject(err); + } + else { + var result = []; + + for (var key of res) { + result.push(new CacheEntry(key["key"], key["value"])); + } + + resolve(result); + } + }); + }); } /** @@ -194,11 +177,10 @@ Cache.prototype.getAll = function(keys, callback) { * * @this {Cache} * @param {Object} key Key - * @param {Cache~onGetAll} callback Called on finish with boolean result */ -Cache.prototype.containsKey = function(key, callback) { - this._server.runCommand(this._createCommand("containskey"). - setPostData(JSON.stringify({"key" : key})), callback); +Cache.prototype.containsKey = function(key) { + return this.__createPromise(this._createCommand("containskey"). + setPostData(JSON.stringify({"key" : key}))); } /** @@ -206,11 +188,10 @@ Cache.prototype.containsKey = function(key, callback) { * * @this {Cache} * @param {Object[]} keys Keys - * @param {Cache~onGetAll} callback Called on finish with boolean result */ Cache.prototype.containsKeys = function(keys, callback) { - this._server.runCommand(this._createCommand("containskeys"). - setPostData(JSON.stringify({"keys" : keys})), callback); + return this.__createPromise(this._createCommand("containskeys"). + setPostData(JSON.stringify({"keys" : keys}))); } /** @@ -219,11 +200,10 @@ Cache.prototype.containsKeys = function(keys, callback) { * @this {Cache} * @param {string} key Key * @param {string} value Value - * @param {onGet} callback Called on finish */ -Cache.prototype.getAndPut = function(key, val, callback) { - this._server.runCommand(this._createCommand("getandput"). - setPostData(JSON.stringify({"key" : key, "val" : val})), callback); +Cache.prototype.getAndPut = function(key, val) { + return this.__createPromise(this._createCommand("getandput"). + setPostData(JSON.stringify({"key" : key, "val" : val}))); } /** @@ -232,11 +212,10 @@ Cache.prototype.getAndPut = function(key, val, callback) { * @this {Cache} * @param key Key * @param value Value - * @param {onGet} callback Called on finish */ Cache.prototype.replace = function(key, val, callback) { - this._server.runCommand(this._createCommand("rep"). - setPostData(JSON.stringify({"key" : key, "val" : val})), callback); + return this.__createPromise(this._createCommand("rep"). + setPostData(JSON.stringify({"key" : key, "val" : val}))); } /** @@ -246,11 +225,10 @@ Cache.prototype.replace = function(key, val, callback) { * @param key Key * @param value Value * @param oldVal Old value - * @param {onGet} callback Called on finish */ -Cache.prototype.replaceValue = function(key, val, oldVal, callback) { - this._server.runCommand(this._createCommand("repVal"). - setPostData(JSON.stringify({"key" : key, "val" : val, "oldVal" : oldVal})), callback); +Cache.prototype.replaceValue = function(key, val, oldVal) { + return this.__createPromise(this._createCommand("repVal"). + setPostData(JSON.stringify({"key" : key, "val" : val, "oldVal" : oldVal}))); } /** @@ -259,11 +237,10 @@ Cache.prototype.replaceValue = function(key, val, oldVal, callback) { * @this {Cache} * @param {string} key Key * @param {string} value Value - * @param {onGet} callback Called on finish */ -Cache.prototype.getAndReplace = function(key, val, callback) { - this._server.runCommand(this._createCommand("getandreplace"). - setPostData(JSON.stringify({"key" : key, "val" : val})), callback); +Cache.prototype.getAndReplace = function(key, val) { + return this.__createPromise(this._createCommand("getandreplace"). + setPostData(JSON.stringify({"key" : key, "val" : val}))); } /** @@ -272,92 +249,224 @@ Cache.prototype.getAndReplace = function(key, val, callback) { * @this {Cache} * @param {string} key Key * @param {string} value Value - * @param {onGet} callback Called on finish */ -Cache.prototype.getAndPutIfAbsent = function(key, val, callback) { - this._server.runCommand(this._createCommand("getandputifabsent"). - setPostData(JSON.stringify({"key" : key, "val" : val})), callback); +Cache.prototype.getAndPutIfAbsent = function(key, val) { + return this.__createPromise(this._createCommand("getandputifabsent"). + setPostData(JSON.stringify({"key" : key, "val" : val}))); } /** * @this {Cache} - * @param {onGet} callback Called on finish */ Cache.prototype.size = function(callback) { - this._server.runCommand(this._createCommand("cachesize"), callback); + return this.__createPromise(this._createCommand("cachesize")); } /** * Execute sql query * * @param {SqlQuery|SqlFieldsQuery} qry Query + * @returns {QueryCursor} Cursor for current query. */ Cache.prototype.query = function(qry) { - function onQueryExecute(qry, error, res) { - if (error !== null) { - qry.end(error); + return new QueryCursor(this, qry, true, null); +} - return; - } +Cache.prototype.__createPromise = function(cmd) { + var server = this._server; + + return new Promise(function(resolve, reject) { + server.runCommand(cmd, function(err, res) { + if(err != null) { + reject(err); + } + else { + resolve(res); + } + }); + }); +} - qry.page(res["items"]); +Cache.prototype._createCommand = function(name) { + var command = new Command(name); - if (res["last"]) { - qry.end(null); - } - else { - var command = this._createCommand("qryfetch"); + return command.addParam("cacheName", this._cacheName); +} - command.addParam("qryId", res.queryId).addParam("psz", qry.pageSize()); +/** + * Creates an instance of QueryCursor + * + * @constructor + * @this {QueryCursor} + * @param {Cache} cache Cache that runs query + * @param {SqlQuery|SqlFieldsQuery} qry Sql query + * @param {boolean} init True if query is not started + * @param {Object[]} res Current page result + */ +QueryCursor = function(cache, qry, init, res) { + this._qry = qry; + this._cache = cache; + this._init = init; + this._res = res; +} - this._server.runCommand(command, onQueryExecute.bind(this, qry)); - } +/** + * Gets Promise with all query results. + * Use this method when you know in advance that query result is + * relatively small and will not cause memory utilization issues. + * <p> + * Since all the results will be fetched, all the resources will be closed + * automatically after this call, e.g. there is no need to call close() method in this case. + * + * @this{QueryCursor} + * @returns {Promise} Promise with query result + */ +QueryCursor.prototype.getAll = function() { + if (!this._init) { + return new Promise(function(resolve, reject){ + reject("GetAll is called after nextPage."); + }); } - if (qry.type() === "Sql") { - this._sqlQuery(qry, onQueryExecute); - } - else { - this._sqlFieldsQuery(qry, onQueryExecute); - } -} + var cmd = this._getQueryCommand(); + var server = this._cache._server; + var cursor = this; + + return new Promise(function(resolve, reject) { + var fullRes = []; + + onResult = function (err, res){ + if (err !== null) { + reject(err); + } + else { + cursor._res = res; + + fullRes = fullRes.concat(res["items"]); + + if (res["last"]) { + resolve(fullRes); + } + else { + server.runCommand(cursor._getQueryCommand(), onResult); + } + } + } -Cache.prototype._sqlFieldsQuery = function(qry, onQueryExecute) { - var command = this._createQueryCommand("qryfieldsexecute", qry); + server.runCommand(cmd, onResult); + }); +} - command.setPostData(JSON.stringify({"arg" : qry.arguments()})); +/** + * Gets Promise with Cursor on next page of the query results. + * + * @this{QueryCursor} + * @returns {Promise} Promise with Cursor on next page + */ +QueryCursor.prototype.nextPage = function() { + if (this._res !== null && this._res["last"]) { + throw "All pages are returned."; + } - this._server.runCommand(command, onQueryExecute.bind(this, qry)); + var cmd = this._getQueryCommand(); + var server = this._cache._server; + var qry = this._qry; + var cache = this._cache; + + return new Promise(function(resolve, reject) { + server.runCommand(cmd, function(err, res) { + if(err !== null) { + reject(err); + } + else { + resolve(new QueryCursor(cache, qry, false, res)); + } + }); + }); } -Cache.prototype._sqlQuery = function(qry, onQueryExecute) { - if (qry.returnType() == null) { - qry.end("No type for sql query."); +/** + * Gets collections of the query page results. + * + * @this{QueryCursor} + * @returns {Object[]} Query page result. + */ +QueryCursor.prototype.page = function() { + if (this._res === null) + return null; - return; - } + return this._res["items"]; +} - var command = this._createQueryCommand("qryexecute", qry); +/** + * Closes all resources related to this cursor. + * + * @this{QueryCursor} + * @returns {Promise} Promise on cursor close. + */ +QueryCursor.prototype.close = function() { + if (this._init) { + return new Promise(function(resolve, reject) { + return resolve(true); + }); + } - command.addParam("type", qry.returnType()); + var server = this._cache._server; + var cmd = this._createQueryCommand("qryclose", this._qry).addParam("qryId", this._res.queryId); + + return new Promise(function(resolve, reject) { + server.runCommand(cmd, function(err, res) { + if(err != null) { + reject(err); + } + else { + resolve(true); + } + }); + }); +} - command.setPostData(JSON.stringify({"arg" : qry.arguments()})); +/** + * Returns True if the iteration has no more elements. + * + * @this{QueryCursor} + * @returns {boolean} True if it is the last page + */ +QueryCursor.prototype.isFinished = function() { + if (this._res === null) + return false; - this._server.runCommand(command, onQueryExecute.bind(this, qry)); + return this._res["last"]; } -Cache.prototype._createCommand = function(name) { - var command = new Command(name); +QueryCursor.prototype._getQueryCommand = function() { + if (this._init) { + if (this._qry.type() === "Sql") { + return this._sqlQuery(this._qry); + } - return command.addParam("cacheName", this._cacheName); + this._init = false; + + return this._sqlFieldsQuery(this._qry); + } + + return this._cache._createCommand("qryfetch").addParam("qryId", this._res.queryId). + addParam("psz", this._qry.pageSize()); } -Cache.prototype._createQueryCommand = function(name, qry) { - var command = this._createCommand(name); +QueryCursor.prototype._sqlFieldsQuery = function(qry) { + return this._createQueryCommand("qryfieldsexecute", qry). + setPostData(JSON.stringify({"arg" : qry.arguments()})); +} - command.addParam("qry", qry.query()); +QueryCursor.prototype._sqlQuery = function(qry) { + return this._createQueryCommand("qryexecute", qry).addParam("type", qry.returnType()). + setPostData(JSON.stringify({"arg" : qry.arguments()})); +} - return command.addParam("psz", qry.pageSize()); +QueryCursor.prototype._createQueryCommand = function(name, qry) { + return new Command(name).addParam("cacheName", this._cache._cacheName). + addParam("qry", qry.query()).addParam("psz", qry.pageSize()); } /** @@ -370,13 +479,5 @@ function CacheEntry(key0, val0) { this.value = val0; } -/** - * Callback for cache get - * - * @callback Cache~onGetAll - * @param {string} error Error - * @param {string[]} results Result values - */ - exports.Cache = Cache exports.CacheEntry = CacheEntry \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/compute.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/compute.js b/modules/nodejs/src/main/js/compute.js index 5c28418..53cd11c 100644 --- a/modules/nodejs/src/main/js/compute.js +++ b/modules/nodejs/src/main/js/compute.js @@ -31,11 +31,10 @@ function Compute(server) { * @this {Compute} * @param job Function * @param args Function arguments - * @param {onGet} callback Callback */ -Compute.prototype.run = function(job, args, callback) { - this._server.runCommand(new Command("runscript").addParam("func", job). - setPostData(JSON.stringify({"arg" : args})), callback); +Compute.prototype.run = function(job, args) { + return this._createPromise(new Command("runscript").addParam("func", job). + setPostData(JSON.stringify({"arg" : args}))); } /** @@ -46,11 +45,10 @@ Compute.prototype.run = function(job, args, callback) { * @param {string|number|JSONObject} key Key. * @param job Function * @param args Function arguments - * @param {onGet} callback Callback */ -Compute.prototype.affinityRun = function(cacheName, key, job, args, callback) { - this._server.runCommand(new Command("affrun").addParam("func", job).addParam("cacheName", cacheName). - setPostData(JSON.stringify({"arg" : args, "key" : key})), callback); +Compute.prototype.affinityRun = function(cacheName, key, job, args) { + return this._createPromise(new Command("affrun").addParam("func", job).addParam("cacheName", cacheName). + setPostData(JSON.stringify({"arg" : args, "key" : key}))); } /** @@ -61,14 +59,26 @@ Compute.prototype.affinityRun = function(cacheName, key, job, args, callback) { * @param {onGet} callback Callback */ Compute.prototype.mapReduce = function(map, reduce, arg, callback) { - var command = new Command("excmapreduce"); - - command.addParam("map", map).addParam("reduce", reduce); - command.setPostData(JSON.stringify({"arg" : arg})); + var cmd = new Command("excmapreduce").addParam("map", map).addParam("reduce", reduce). + setPostData(JSON.stringify({"arg" : arg})); - this._server.runCommand(command, callback); + return this._createPromise(cmd); } + +Compute.prototype._createPromise = function(cmd) { + var server = this._server; + return new Promise(function(resolve, reject) { + server.runCommand(cmd, function(err, res) { + if (err != null) { + reject(err); + } + else { + resolve(res); + } + }); + }); +} /** * @name EmitFunction * @function http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/ignite.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/ignite.js b/modules/nodejs/src/main/js/ignite.js index a4a1dd9..c3d90ac 100644 --- a/modules/nodejs/src/main/js/ignite.js +++ b/modules/nodejs/src/main/js/ignite.js @@ -15,10 +15,10 @@ * limitations under the License. */ +var Promise = require("bluebird"); var Cache = require("./cache").Cache; var Compute = require("./compute").Compute; var ClusterNode = require("./cluster-node").ClusterNode; -var Server = require("./server").Server; var Command = require("./server").Command; /** @@ -56,21 +56,20 @@ Ignite.prototype.cache = function(cacheName) { * * @this {Ignite} * @param {string} Cache name - * @param callback Callback with cache. */ -Ignite.prototype.getOrCreateCache = function(cacheName, callback) { - var onCreateCallback = function(callback, err, res) { - if (err !== null) { - callback.call(null, err, null); - - return; - } - - callback.call(null, null, new Cache(this._server, cacheName)) - } - - this._server.runCommand(new Command("getorcreatecache").addParam("cacheName", cacheName), - onCreateCallback.bind(this, callback)); +Ignite.prototype.getOrCreateCache = function(cacheName) { + var server = this._server; + return new Promise(function(resolve, reject) { + server.runCommand(new Command("getorcreatecache").addParam("cacheName", cacheName), + function(err, res) { + if (err != null) { + reject(err); + } + else { + resolve(new Cache(server, cacheName)); + } + }); + }); } /** @@ -78,10 +77,9 @@ Ignite.prototype.getOrCreateCache = function(cacheName, callback) { * * @this {Ignite} * @param {string} cacheName Cache name to stop - * @param {noValue} callback Callback contains only error */ -Ignite.prototype.destroyCache = function(cacheName, callback) { - this._server.runCommand(new Command("destroycache").addParam("cacheName", cacheName), callback); +Ignite.prototype.destroyCache = function(cacheName) { + return this._createPromise(new Command("destroycache").addParam("cacheName", cacheName)); } /** @@ -98,51 +96,62 @@ Ignite.prototype.compute = function() { * Ignite version * * @this {Ignite} - * @param {onGet} callback Result in callback contains string with Ignite version. */ -Ignite.prototype.version = function(callback) { - this._server.runCommand(new Command("version"), callback); +Ignite.prototype.version = function() { + return this._createPromise(new Command("version")); } /** * Connected ignite name * * @this {Ignite} - * @param {onGet} callback Result in callback contains string with Ignite name. */ -Ignite.prototype.name = function(callback) { - this._server.runCommand(new Command("name"), callback); +Ignite.prototype.name = function() { + return this._createPromise(new Command("name")); } /** * @this {Ignite} - * @param {onGet} callback Result in callback contains list of ClusterNodes */ -Ignite.prototype.cluster = function(callback) { - function onTop(callback, err, res) { - if (err) { - callback.call(null, err, null); - - return; - } - - if (!res || res.length == 0) { - callback.call(null, "Empty topology cluster.", null); - - return; - } - - var nodes = []; - - for (var node of res) { - nodes.push(new ClusterNode(node.nodeId, node.attributes)); - } - - callback.call(null, null, nodes); - } +Ignite.prototype.cluster = function() { + var cmd = new Command("top").addParam("attr", "true").addParam("mtr", "false"); + + var server = this._server; + return new Promise(function(resolve, reject) { + server.runCommand(cmd, function(err, res) { + if (err != null) { + reject(err); + } + else { + if (!res || res.length == 0) { + reject("Empty topology cluster."); + } + else { + var nodes = []; + + for (var node of res) { + nodes.push(new ClusterNode(node.nodeId, node.attributes)); + } + + resolve(nodes); + } + } + }); + }); +} - this._server.runCommand(new Command("top").addParam("attr", "true").addParam("mtr", "false"), - onTop.bind(null, callback)); +Ignite.prototype._createPromise = function(cmd) { + var server = this._server; + return new Promise(function(resolve, reject) { + server.runCommand(cmd, function(err, res) { + if (err != null) { + reject(err); + } + else { + resolve(res); + } + }); + }); } exports.Ignite = Ignite; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/ignition.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/ignition.js b/modules/nodejs/src/main/js/ignition.js index 049eb4b..a7d4518 100644 --- a/modules/nodejs/src/main/js/ignition.js +++ b/modules/nodejs/src/main/js/ignition.js @@ -14,7 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +var Server = require("./server").Server; +var Ignite = require("./ignite").Ignite /** * Creates an instance of Ignition * @@ -36,73 +37,70 @@ function Ignition() { * * @param {string[]} address List of nodes hosts with ports * @param {string} secretKey Secret key. - * @param {Ignition~onStart} callback Called on finish */ -Ignition.start = function(address, secretKey, callback) { - var Server = require("./server").Server; - var Ignite = require("./ignite").Ignite - - var numConn = 0; - - for (var addr of address) { - var params = addr.split(":"); - - var portsRange = params[1].split(".."); - - var start; - var end; - - if (portsRange.length === 1) { - start = parseInt(portsRange[0], 10); - end = start; - } - else if (portsRange.length === 2) { - start = parseInt(portsRange[0], 10); - end = parseInt(portsRange[1], 10); +Ignition.start = function(address, secretKey) { + return new Promise(function(resolve, reject) { + var numConn = 0; + + var needVal = true; + + for (var addr of address) { + var params = addr.split(":"); + + var portsRange = params[1].split(".."); + + var start; + var end; + + if (portsRange.length === 1) { + start = parseInt(portsRange[0], 10); + end = start; + } + else if (portsRange.length === 2) { + start = parseInt(portsRange[0], 10); + end = parseInt(portsRange[1], 10); + } + + if (isNaN(start) || isNaN(end)) { + needVal = false; + + reject("Incorrect address format."); + } + else { + for (var i = start; i <= end; i++) { + checkServer(params[0], i, secretKey); + } + } } - if (isNaN(start) || isNaN(end)) { - incorrectAddress(); + function checkServer(host, port, secretKey) { + numConn++; - return; - } + var server = new Server(host, port, secretKey); - for (var i = start; i <= end; i++) { - checkServer(params[0], i, secretKey); + server.checkConnection(onConnect.bind(this, server)); } - } - - function checkServer(host, port, secretKey) { - numConn++; - - var server = new Server(host, port, secretKey); - server.checkConnection(onConnect.bind(null, server)); - } + function onConnect(server, error) { + if (!needVal) return; - function incorrectAddress() { - callback.call(null, "Incorrect address format.", null); + numConn--; - callback = null; - } + if (!error) { + resolve(new Ignite(server)); - function onConnect(server, error) { - if (!callback) return; + needVal = false; - numConn--; + return; + } - if (!error) { - callback.call(null, null, new Ignite(server)); - - callback = null; - - return; - } + if (!numConn) { + reject("Cannot connect to servers. " + error); - if (!numConn) { - callback.call(null, "Cannot connect to servers. " + error, null); + return; + } } - } + }); } exports.Ignition = Ignition; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/package.json ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/package.json b/modules/nodejs/src/main/js/package.json index ae4b911..47c627e 100644 --- a/modules/nodejs/src/main/js/package.json +++ b/modules/nodejs/src/main/js/package.json @@ -10,5 +10,8 @@ "license" : "Apache-2.0", "keywords" : "grid", "homepage" : "https://ignite.incubator.apache.org/", - "engines" : { "node" : ">=0.12.4" } + "engines" : { "node" : ">=0.12.4" }, + "dependencies" : { + "bluebird" : ">=2.0.0" + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/main/js/server.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/server.js b/modules/nodejs/src/main/js/server.js index 5d7430a..f8a98ab 100644 --- a/modules/nodejs/src/main/js/server.js +++ b/modules/nodejs/src/main/js/server.js @@ -86,6 +86,8 @@ Server.prototype.runCommand = function(cmd, callback) { }); response.on('end', function () { + console.log("Full response:" + fullResponseString); + if (response.statusCode !== 200) { if (response.statusCode === 401) { callback.call(null, "Authentication failed. Status code 401."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java index 205e467..bdbebab 100644 --- a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java +++ b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsIgnitionSelfTest.java @@ -41,28 +41,28 @@ public class NodeJsIgnitionSelfTest extends NodeJsAbstractTest { /** * @throws Exception If failed. */ - public void testIgnitionStart() throws Exception { - runJsScript("ignitionStartSuccess"); + public void testIgnitionStartSuccess() throws Exception { + runJsScript("testIgnitionStartSuccess"); } /** * @throws Exception If failed. */ - public void testIgnitionFailedStart() throws Exception { + public void testIgnitionFail() throws Exception { runJsScript("testIgnitionFail"); } /** * @throws Exception If failed. */ - public void testIgnitionStartWithSeveralPorts() throws Exception { - runJsScript("ignitionStartSuccessWithSeveralPorts"); + public void testIgnitionStartSuccessWithSeveralPorts() throws Exception { + runJsScript("testIgnitionStartSuccessWithSeveralPorts"); } /** * @throws Exception If failed. */ public void testIgnitionNotStartWithSeveralPorts() throws Exception { - runJsScript("ignitionNotStartWithSeveralPorts"); + runJsScript("testIgnitionNotStartWithSeveralPorts"); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8616eebb/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java index 9024b93..9a29f52 100644 --- a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java +++ b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsSqlQuerySelfTest.java @@ -64,6 +64,24 @@ public class NodeJsSqlQuerySelfTest extends NodeJsAbstractTest { /** * @throws Exception If failed. */ + public void testCloseQuery() throws Exception { + initCache(); + + runJsScript("testCloseQuery"); + } + + /** + * @throws Exception If failed. + */ + public void testSqlFieldsGetAllQuery() throws Exception { + initCache(); + + runJsScript("testSqlFieldsGetAllQuery"); + } + + /** + * @throws Exception If failed. + */ public void testSqlQueryWithParams() throws Exception { initCache();