Repository: incubator-ignite Updated Branches: refs/heads/ignite-965 01c9c3acb -> 47276a81e
#ignite-965: map and reduce are executed on server. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d481ea8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d481ea8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d481ea8c Branch: refs/heads/ignite-965 Commit: d481ea8c76f9de8e0e94e6a2ee7a46d3788c1442 Parents: 01c9c3a Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue Jun 23 18:08:27 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue Jun 23 18:08:27 2015 +0300 ---------------------------------------------------------------------- .../IgniteComputeTaskCommandHandler.java | 102 ++++++++++++++----- .../rest/request/RestComputeTaskRequest.java | 38 +++++-- .../scripting/IgniteScriptProcessor.java | 22 +++- modules/nodejs/src/main/js/apache-ignite.js | 3 +- modules/nodejs/src/main/js/compute-task.js | 38 ------- modules/nodejs/src/main/js/compute.js | 75 ++------------ modules/nodejs/src/test/js/test-compute.js | 37 ++++--- modules/nodejs/src/test/js/test-utils.js | 10 +- .../http/jetty/GridJettyRestHandler.java | 17 +--- 9 files changed, 161 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java index a3708ab..7329a5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java @@ -25,12 +25,11 @@ import org.apache.ignite.internal.processors.rest.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.processors.rest.request.*; import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.resources.*; import org.jetbrains.annotations.*; -import javax.script.ScriptException; +import javax.script.*; import java.util.*; import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; @@ -64,25 +63,28 @@ public class IgniteComputeTaskCommandHandler extends GridRestCommandHandlerAdapt final RestComputeTaskRequest req0 = (RestComputeTaskRequest) req; - List<T3<String, String, String>> mapping = req0.mapping(); - - Object res = ctx.grid().compute().execute(new JsTask(mapping, ctx), null); + Object res = ctx.grid().compute().execute(new JsTask(req0.mapFunc(), req0.argument(), req0.reduceFunc(), ctx), null); return new GridFinishedFuture<>(new GridRestResponse(res)); } private static class JsTask extends ComputeTaskAdapter<String, Object> { - /** Mapping. */ - private List<T3<String, String, String>> mapping; + /** Mapping function. */ + private String mapFunc; + + private String reduceFunc; /** Grid kernal context. */ private GridKernalContext ctx; + private String arg; + /** - * @param mapping Task mapping. */ - public JsTask(List<T3<String, String, String>> mapping, GridKernalContext ctx) { - this.mapping = mapping; + public JsTask(String mapFunc, String arg, String reduceFunc, GridKernalContext ctx) { + this.mapFunc = mapFunc; + this.reduceFunc = reduceFunc; + this.arg = arg; this.ctx = ctx; } @@ -90,27 +92,68 @@ public class IgniteComputeTaskCommandHandler extends GridRestCommandHandlerAdapt @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, String arg) { Map<ComputeJob, ClusterNode> map = new HashMap<>(); - for (final T3<String, String, String> job : mapping) { - UUID nodeId = UUID.fromString(job.get2()); + String nodesIds = "["; - ClusterNode node = ctx.grid().cluster().node(nodeId); + for (ClusterNode node : nodes) + nodesIds += "\"" + node.id().toString() + "\"" + ","; - map.put(new ComputeJobAdapter() { - /** Ignite. */ - @IgniteInstanceResource - private Ignite ignite; + nodesIds = nodesIds.substring(0, nodesIds.length() - 1) + "]"; - @Override public Object execute() throws IgniteException { - System.out.println("Compute job on node " + ignite.cluster().localNode().id()); + try { + String newMap = new String("function () {\n" + + " var res = [];\n" + + " var resCont = function(f, args, nodeId) {\n" + + " res.push([f.toString(), args, nodeId])\n" + + " }\n" + + " var locF = " + mapFunc + "; \n locF(" + + nodesIds + ", " + + "\"" + this.arg + "\"" + + ", resCont.bind(null)" + ");\n" + + " return res;\n" + + "}"); - try { - return ((IgniteKernal)ignite).context().scripting().runJS(job.get1(), job.get3()); - } - catch (ScriptException e) { - throw new IgniteException(e); + List mapRes = (List)ctx.scripting().runJS(newMap); + + for (Object arr : mapRes) { + Object[] nodeTask = ((List)arr).toArray(); + + final String func = (String)nodeTask[0]; + + final List argv = (List) nodeTask[1]; + + String nodeIdStr = (String) nodeTask[2]; + + UUID nodeId = UUID.fromString(nodeIdStr); + + ClusterNode node = ctx.grid().cluster().node(nodeId); + + map.put(new ComputeJobAdapter() { + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; + + @Override public Object execute() throws IgniteException { + System.out.println("Compute job on node " + ignite.cluster().localNode().id()); + try { + String[] argv1 = new String[argv.size()]; + + for (int i = 0; i < argv1.length; ++i) + argv1[i] = "\"" + argv.get(i).toString() + "\""; + + return ctx.scripting().runJS(func, argv1); + } + catch (Exception e) { + throw new IgniteException(e); + } } - } - }, node); + }, node); + + } + } + catch (ScriptException e) { + throw new IgniteException(e); + } + finally { } return map; @@ -123,7 +166,12 @@ public class IgniteComputeTaskCommandHandler extends GridRestCommandHandlerAdapt for (ComputeJobResult res : results) data.add(res.getData()); - return data; + try { + return ctx.scripting().runJS(reduceFunc, new String[] {data.toString()}); + } + catch (ScriptException e) { + throw new IgniteException(e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java index bae7ce5..89e04da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java @@ -26,19 +26,35 @@ import java.util.*; */ public class RestComputeTaskRequest extends GridRestRequest { /** Mapping tasks to nodes. */ - private List<T3<String, String, String>> mapping; + private String mapFunc; - /** - * @param mapping Mapping tasks to nodes. - */ - public void mapping(List<T3<String, String, String>> mapping) { - this.mapping = mapping; + /** Function argument. */ + private String arg; + + private String reduceFunc; + + public void reduceFunc(String reduceFunc) { + this.reduceFunc = reduceFunc; + } + + public String reduceFunc() { + + return reduceFunc; + } + + public String mapFunc() { + return mapFunc; + } + + public String argument() { + return arg; + } + + public void mapFunc(String mapFunc) { + this.mapFunc = mapFunc; } - /** - * @return Mapping tasks to nodes. - */ - public List<T3<String, String, String>> mapping() { - return mapping; + public void argument(String arg) { + this.arg = arg; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java index 54004de..99c206e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java @@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.scripting; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.typedef.*; import javax.script.*; +import java.util.*; /** * Ignite scripting manager. @@ -46,7 +48,7 @@ public class IgniteScriptProcessor extends GridProcessorAdapter { * @throws ScriptException If script failed. */ public Object run(String engName, String script) throws ScriptException { - if (engName.equals(JAVA_SCRIPT_ENGINE_NAME)) + if (!engName.equals(JAVA_SCRIPT_ENGINE_NAME)) throw new IgniteException("Engine is not supported. [engName=" + engName + "]"); return runJS(script); @@ -54,9 +56,11 @@ public class IgniteScriptProcessor extends GridProcessorAdapter { /** * @param script Script. + * @param args Arguments. + * @return Script result. * @throws ScriptException If script failed. */ - public Object runJS(String script, String args) throws ScriptException { + public Object runJS(String script, String[] args) throws ScriptException { ScriptEngine engine = factory.getEngineByName("JavaScript"); Bindings b = engine.createBindings(); @@ -65,13 +69,23 @@ public class IgniteScriptProcessor extends GridProcessorAdapter { engine.setBindings(b, ScriptContext.ENGINE_SCOPE); - script = "(" + script + ")(" + args + ");"; + script = "(" + script + ")(" ; + + for (int i = 0; i < args.length; ++i) + script += args[i] + (i < args.length - 1 ? "," : ""); + + script += ");"; return engine.eval(script); } + /** + * @param script Script. + * @return Script result. + * @throws ScriptException If script failed. + */ public Object runJS(String script) throws ScriptException { - return runJS(script, ""); + return runJS(script, new String[]{""}); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/main/js/apache-ignite.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/apache-ignite.js b/modules/nodejs/src/main/js/apache-ignite.js index f90cf68..0df6160 100644 --- a/modules/nodejs/src/main/js/apache-ignite.js +++ b/modules/nodejs/src/main/js/apache-ignite.js @@ -20,6 +20,5 @@ module.exports = { Ignition : require('./ignition.js').Ignition, Server : require('./server.js').Server, Ignite : require('./ignite.js').Ignite, - Compute : require('./compute.js').Compute, - ComputeJob: require('./compute.js').ComputeJob + Compute : require('./compute.js').Compute } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/main/js/compute-task.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/compute-task.js b/modules/nodejs/src/main/js/compute-task.js deleted file mode 100644 index d13f361..0000000 --- a/modules/nodejs/src/main/js/compute-task.js +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * @constructor - * @this {ComputeTask} - */ -function ComputeTask() { -} - -/** - * @param {string[]} nodes Nodes id - * @param {string} arg Argument - * @returns {Object.<string, Cache~onGet>} Map of grid jobs assigned to subgrid node. Unless {@link ComputeTaskContinuousMapper} is - * injected into task, if {@code null} or empty map is returned, exception will be thrown - */ -ComputeTask.prototype.map = function(nodes, arg) { -} - -/** - * @param {string[]} results Results - */ -ComputeTask.prototype.reduce = function(results) { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/main/js/compute.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/compute.js b/modules/nodejs/src/main/js/compute.js index fb38e7a..7a5f808 100644 --- a/modules/nodejs/src/main/js/compute.js +++ b/modules/nodejs/src/main/js/compute.js @@ -16,7 +16,6 @@ */ var Server = require("./server").Server; -var ComputeTask = require("./compute-task").ComputeTask; /** * @constructor @@ -72,76 +71,14 @@ Compute.prototype._escape = function(f) { * @param {string} arg Argument * @param {} callback Callback */ -Compute.prototype.execute = function(task, arg, callback) { - this._nodes(this._onNodesExecute.bind(this, task, arg, callback)); -} - -Compute.prototype._nodes = function(callback) { - this._server.runCommand("top", [Server.pair("mtr", "false"), Server.pair("attr", "false")], - this._onNodes.bind(this, callback)) -} - -Compute.prototype._onNodes = function(callback, error, results) { - if (error) { - callback.call(null, error, null); - - return; - } - - var nodes = []; - - for (var res of results) { - nodes.push(res["nodeId"]) - } - - callback.call(null, null, nodes); -} - -Compute.prototype._onNodesExecute = function(task, arg, callback, err, nodes) { - if (err) { - callback.call(null, error, null); - - return; - } - - var computeJobList = task.map(nodes, arg); +Compute.prototype.execute = function(map, reduce, arg, callback) { + var params = []; - var params = []; - var i = 1; + params.push(Server.pair("map", this._escape(map))); + params.push(Server.pair("reduce", this._escape(reduce))); + params.push(Server.pair("arg", this._escape(arg))); - console.log("TASK" + computeJobList); - for (var job of computeJobList) { - params.push(Server.pair("f" + i, this._escape(job.func))); - params.push(Server.pair("args" + i, JSON.stringify(job.args))); - params.push(Server.pair("n" + i, job.node)); - i++; - } - - this._server.runCommand("exectask", params, this._onResExecute.bind(this, task, callback)); -} - - -Compute.prototype._onResExecute = function(task, callback, err, results) { - if (err) { - callback.call(null, err, null); - - return; - } - - console.log("ON RES EXEC = " + results); - - var res = task.reduce(results); - - callback.call(null, null, res); + this._server.runCommand("exectask", params, callback); } exports.Compute = Compute - - -function ComputeJob(func, args, node) { - this.func = func; - this.args = args; - this.node = node; -} - -exports.ComputeJob = ComputeJob; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/test/js/test-compute.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/test/js/test-compute.js b/modules/nodejs/src/test/js/test-compute.js index 87a284c..c444380 100644 --- a/modules/nodejs/src/test/js/test-compute.js +++ b/modules/nodejs/src/test/js/test-compute.js @@ -17,10 +17,6 @@ var TestUtils = require("./test-utils").TestUtils; -var Apache = require(TestUtils.scriptPath()); -var Cache = Apache.Cache; -var Server = Apache.Server; - var assert = require("assert"); testComputeAffinityRun = function() { @@ -32,11 +28,7 @@ testComputeAffinityCall = function() { } testComputeExecute = function() { - var CharacterCountTask = require("./simple-compute-task").CharacterCountTask - - var task = new CharacterCountTask(); - - TestUtils.startIgniteNode(onStart1.bind(null, task)); + TestUtils.startIgniteNode(onStart1); } function onStart(locOnPut, error, ignite) { @@ -92,10 +84,31 @@ function onError1(error, res) { TestUtils.testDone(); } -function onStart1(task, error, ignite) { - var comp = ignite.compute(); +function onStart1(error, ignite) { + var map = function(nodes, arg, emit) { + var words = arg.split(" "); + + for (var i = 0; i < words.length; i++) { + var f = function (word) { + println(">>> Printing " + word); + + return word.length; + }; + + emit(f, [words[i]], nodes[i % nodes.length]); + } + }; + + var reduce = function(results) { + var sum = 0; + + for (var i = 0; i < results.length; ++i) + sum += parseInt(results[i], 10); + + return sum; + }; - comp.execute(task, "Hi Alice", onComputeResult); + ignite.compute().execute(map, reduce, "Hi Alice", onComputeResult); } function onComputeResult(error, res) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/test/js/test-utils.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/test/js/test-utils.js b/modules/nodejs/src/test/js/test-utils.js index 135da8c..ee83d7f 100644 --- a/modules/nodejs/src/test/js/test-utils.js +++ b/modules/nodejs/src/test/js/test-utils.js @@ -129,8 +129,9 @@ TestUtils.testDone = function() { * @param {Ignition~onStart} callback Called on connect */ TestUtils.startIgniteNode = function(callback) { - var Apache = require(TestUtils.scriptPath()); - var Ignition = Apache.Ignition; + var Ignite = require(TestUtils.scriptPath()); + var Ignition = Ignite.Ignition; + Ignition.start(['127.0.0.1:9095'], null, callback); } @@ -141,8 +142,9 @@ TestUtils.startIgniteNode = function(callback) { * @param {Ignition~onStart} callback Called on connect */ TestUtils.startIgniteNodeWithKey = function(secretKey, callback) { - var Apache = require(TestUtils.scriptPath()); - var Ignition = Apache.Ignition; + var Ignite = require(TestUtils.scriptPath()); + var Ignition = Ignite.Ignition; + Ignition.start(['127.0.0.1:9095'], secretKey, callback); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java ---------------------------------------------------------------------- diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java index 5fe4cd9..7d71530 100644 --- a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java +++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java @@ -462,20 +462,9 @@ public class GridJettyRestHandler extends AbstractHandler { case EXECUTE_TASK: { RestComputeTaskRequest restReq0 = new RestComputeTaskRequest(); - List<Object> funcs = values("f", params); - List<Object> nodes = values("n", params); - - List<Object> args = values("args", params); - - assert funcs.size() == nodes.size(); - - List<T3<String, String, String>> mapping = new ArrayList<>(); - - - for (int i = 0; i < funcs.size(); ++i) - mapping.add(new T3((String) funcs.get(i), (String)nodes.get(i), (String)args.get(i))); - - restReq0.mapping(mapping); + restReq0.mapFunc((String)params.get("map")); + restReq0.argument((String) params.get("arg")); + restReq0.reduceFunc((String) params.get("reduce")); restReq = restReq0;