Repository: incubator-ignite Updated Branches: refs/heads/ignite-965 6d3218c38 -> b42bd3d3c
#ignite-965: start implementing IgniteComputeTask for nodejs Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b42bd3d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b42bd3d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b42bd3d3 Branch: refs/heads/ignite-965 Commit: b42bd3d3c6ba1e433641f5a1dff40c48522414ce Parents: 6d3218c Author: ivasilinets <ivasilin...@gridgain.com> Authored: Mon Jun 22 20:53:30 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Mon Jun 22 20:53:30 2015 +0300 ---------------------------------------------------------------------- .../computegrid/ComputeTaskMapExample.java | 2 +- .../processors/rest/GridRestCommand.java | 8 +- .../processors/rest/GridRestProcessor.java | 1 + .../compute/IgniteComputeCommandHandler.java | 55 +++++++--- .../IgniteComputeTaskCommandHandler.java | 67 ++++++++++++ .../rest/request/RestComputeTaskRequest.java | 42 ++++++++ .../scripting/IgniteScriptProcessor.java | 15 +++ modules/nodejs/src/main/js/apache-ignite.js | 3 +- modules/nodejs/src/main/js/compute-task.js | 38 +++++++ modules/nodejs/src/main/js/compute.js | 102 +++++++++++++++++-- .../ignite/internal/NodeJsComputeSelfTest.java | 18 +++- .../nodejs/src/test/js/simple-compute-task.js | 57 +++++++++++ modules/nodejs/src/test/js/test-compute.js | 71 +++++++++++-- .../http/jetty/GridJettyRestHandler.java | 24 ++++- 14 files changed, 462 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskMapExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskMapExample.java b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskMapExample.java index a06299f..9ca51f7 100644 --- a/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskMapExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/computegrid/ComputeTaskMapExample.java @@ -79,7 +79,7 @@ public class ComputeTaskMapExample { Iterator<ClusterNode> it = nodes.iterator(); - for (final String word : arg.split(" ")) { + for (final String word : words) { // If we used all nodes, restart the iterator. if (!it.hasNext()) it = nodes.iterator(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java index a539180..91022fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java @@ -97,7 +97,13 @@ public enum GridRestCommand { QUIT("quit"), /** Affinity run. */ - AFFINITY_RUN("affrun"); + AFFINITY_RUN("affrun"), + + /** Affinity call. */ + AFFINITY_CALL("affcall"), + + /** Execute task. */ + EXECUTE_TASK("exectask"); /** Enum values. */ private static final GridRestCommand[] VALS = values(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 3377dcd..c156101 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -254,6 +254,7 @@ public class GridRestProcessor extends GridProcessorAdapter { addHandler(new GridVersionCommandHandler(ctx)); addHandler(new DataStructuresCommandHandler(ctx)); addHandler(new IgniteComputeCommandHandler(ctx)); + addHandler(new IgniteComputeTaskCommandHandler(ctx)); // Start protocols. startTcpProtocol(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeCommandHandler.java index f3da040..345a898 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeCommandHandler.java @@ -37,7 +37,8 @@ import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; */ public class IgniteComputeCommandHandler extends GridRestCommandHandlerAdapter { /** Supported commands. */ - private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(AFFINITY_RUN); + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(AFFINITY_RUN, + AFFINITY_CALL); /** * @param ctx Context. @@ -61,20 +62,42 @@ public class IgniteComputeCommandHandler extends GridRestCommandHandlerAdapter { final RestComputeRequest req0 = (RestComputeRequest) req; - ctx.grid().compute().affinityRun(req0.cacheName(), req0.key(), new IgniteRunnable() { - @IgniteInstanceResource - private Ignite ignite; - - @Override public void run() { - try { - ((IgniteKernal)ignite).context().scripting().runJS(req0.function()); - } - catch (ScriptException e) { - throw new IgniteException(e); - } - } - }); - - return new GridFinishedFuture<>(new GridRestResponse("AFFINITY RUN " + req)); + switch (req.command()) { + case AFFINITY_RUN: + ctx.grid().compute().affinityRun(req0.cacheName(), req0.key(), new IgniteRunnable() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public void run() { + try { + ((IgniteKernal) ignite).context().scripting().runJS(req0.function()); + } + catch (ScriptException e) { + throw new IgniteException(e); + } + } + }); + + return new GridFinishedFuture<>(new GridRestResponse("AFFINITY RUN " + req)); + + case AFFINITY_CALL: + Object res = ctx.grid().compute().affinityCall(req0.cacheName(), req0.key(), new IgniteCallable<Object>() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public Object call() { + try { + return ((IgniteKernal) ignite).context().scripting().runJS(req0.function()); + } + catch (ScriptException e) { + throw new IgniteException(e); + } + } + }); + + return new GridFinishedFuture<>(new GridRestResponse("AFFINITY RUN " + res)); + } + + return new GridFinishedFuture<>(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java new file mode 100644 index 0000000..2c7ec5b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.compute; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; + +/** + * Compute task command handler. + */ +public class IgniteComputeTaskCommandHandler extends GridRestCommandHandlerAdapter { + /** Supported commands. */ + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_TASK); + + /** + * @param ctx Context. + */ + public IgniteComputeTaskCommandHandler(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<GridRestResponse> handleAsync(GridRestRequest req) { + assert req != null; + + assert req instanceof RestComputeTaskRequest : "Invalid type of compute request."; + + assert SUPPORTED_COMMANDS.contains(req.command()); + + final RestComputeTaskRequest req0 = (RestComputeTaskRequest) req; + + Map<String, String> mapping = req0.mapping(); + + + + + return new GridFinishedFuture<>(new GridRestResponse("5")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java new file mode 100644 index 0000000..74eb3cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +import java.util.*; + +/** + * Compute task request. + */ +public class RestComputeTaskRequest extends GridRestRequest { + /** Mapping tasks to nodes. */ + private Map<String, String> mapping; + + /** + * @param mapping Mapping tasks to nodes. + */ + public void mapping(Map<String, String> mapping) { + this.mapping = mapping; + } + + /** + * @return Mapping tasks to nodes. + */ + public Map<String, String> mapping() { + return mapping; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java index bf0d063..1e333f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java @@ -59,8 +59,23 @@ public class IgniteScriptProcessor extends GridProcessorAdapter { public Object runJS(String script) throws ScriptException { ScriptEngine engine = factory.getEngineByName("JavaScript"); + Bindings b = engine.createBindings(); + + b.put("ignite", new Ignite()); + + engine.setBindings(b, ScriptContext.ENGINE_SCOPE); + script = "(" + script + ")();"; return engine.eval(script); } + + /** + * Ignite JS binding. + */ + public static class Ignite { + public void hello() { + System.out.println("HELLO HAPPY WORLD!!!"); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/nodejs/src/main/js/apache-ignite.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/apache-ignite.js b/modules/nodejs/src/main/js/apache-ignite.js index e03334d..0df6160 100644 --- a/modules/nodejs/src/main/js/apache-ignite.js +++ b/modules/nodejs/src/main/js/apache-ignite.js @@ -19,5 +19,6 @@ module.exports = { Cache : require('./cache.js').Cache, Ignition : require('./ignition.js').Ignition, Server : require('./server.js').Server, - Ignite : require('./ignite.js').Ignite + Ignite : require('./ignite.js').Ignite, + Compute : require('./compute.js').Compute } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/nodejs/src/main/js/compute-task.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/compute-task.js b/modules/nodejs/src/main/js/compute-task.js new file mode 100644 index 0000000..d13f361 --- /dev/null +++ b/modules/nodejs/src/main/js/compute-task.js @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @constructor + * @this {ComputeTask} + */ +function ComputeTask() { +} + +/** + * @param {string[]} nodes Nodes id + * @param {string} arg Argument + * @returns {Object.<string, Cache~onGet>} Map of grid jobs assigned to subgrid node. Unless {@link ComputeTaskContinuousMapper} is + * injected into task, if {@code null} or empty map is returned, exception will be thrown + */ +ComputeTask.prototype.map = function(nodes, arg) { +} + +/** + * @param {string[]} results Results + */ +ComputeTask.prototype.reduce = function(results) { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/nodejs/src/main/js/compute.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/main/js/compute.js b/modules/nodejs/src/main/js/compute.js index 2ab68eb..fe0820b 100644 --- a/modules/nodejs/src/main/js/compute.js +++ b/modules/nodejs/src/main/js/compute.js @@ -16,6 +16,7 @@ */ var Server = require("./server").Server; +var ComputeTask = require("./compute-task").ComputeTask; /** * @constructor @@ -33,17 +34,104 @@ function Compute(server) { /** * @this {Compute} - * @param {String} cacheName Cache name. - * @param {String} key Key. + * @param {string} cacheName Cache name. + * @param {string} key Key. * @param {Compute~runnable} runnable Function without parameters * @param {Cache~noValue} callback Callback */ Compute.prototype.affinityRun = function(cacheName, key, runnable, callback) { - var f = runnable.toString(); - var qs = require('querystring'); - f = qs.escape(f); this._server.runCommand("affrun", [Server.pair("cacheName", cacheName), - Server.pair("key", key), Server.pair("func", f)], callback); + Server.pair("key", key), Server.pair("func", this._escape(runnable))], callback); +} + +/** + * @this {Compute} + * @param {string} cacheName Cache name. + * @param {string} key Key. + * @param {Compute~runnable} runnable Function without parameters + * @param {Cache~onGet} callback Callback + */ +Compute.prototype.affinityCall = function(cacheName, key, runnable, callback) { + this._server.runCommand("affcall", [Server.pair("cacheName", cacheName), + Server.pair("key", key), Server.pair("func", this._escape(runnable))], callback); +} + +/** + * @param{Cache~noValue} f Function + * @returns {string} Encoding function + */ +Compute.prototype._escape = function(f) { + var f = f.toString(); + var qs = require('querystring'); + return qs.escape(f); +} + +/** + * @this {Compute} + * @param {ComputeTask} task Compute task + * @param {string} arg Argument + * @param {} callback Callback + */ +Compute.prototype.execute = function(task, arg, callback) { + this._nodes(this._onNodesExecute.bind(this, task, arg, callback)); +} + +Compute.prototype._nodes = function(callback) { + this._server.runCommand("top", [Server.pair("mtr", "false"), Server.pair("attr", "false")], + this._onNodes.bind(this, callback)) +} + +Compute.prototype._onNodes = function(callback, error, results) { + if (error) { + callback.call(null, error, null); + + return; + } + + var nodes = []; + + for (var res of results) { + nodes.push(res["nodeId"]) + } + + callback.call(null, null, nodes); +} + +Compute.prototype._onNodesExecute = function(task, arg, callback, err, nodes) { + if (err) { + callback.call(null, error, null); + + return; + } + + var taskMap = task.map(nodes, arg); + + var params = []; + var i = 0; + + console.log("TASK" + taskMap); + for (var f in taskMap) { + params.push(Server.pair("f" + i, this._escape(f))); + params.push(Server.pair("n" + i, taskMap[f])); + i++; + } + + this._server.runCommand("exectask", params, this._onResExecute.bind(this, task, callback)); +} + + +Compute.prototype._onResExecute = function(task, callback, err, results) { + if (err) { + callback.call(null, err, null); + + return; + } + + console.log("ON RES EXEC = " + results); + + var res = task.reduce(results); + + callback.call(null, null, res); } -exports.Compute = Compute; \ No newline at end of file +exports.Compute = Compute \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java index bd66b1b..e6ec550 100644 --- a/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java +++ b/modules/nodejs/src/test/java/org/apache/ignite/internal/NodeJsComputeSelfTest.java @@ -39,7 +39,21 @@ public class NodeJsComputeSelfTest extends NodeJsAbstractTest { /** * @throws Exception If failed. */ - public void testCompute() throws Exception { - runJsScript("testCompute"); + public void testComputeAffinityRun() throws Exception { + runJsScript("testComputeAffinityRun"); + } + + /** + * @throws Exception If failed. + */ + public void testComputeAffinityCall() throws Exception { + runJsScript("testComputeAffinityCall"); + } + + /** + * @throws Exception If failed. + */ + public void testComputeExecute() throws Exception { + runJsScript("testComputeExecute"); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/nodejs/src/test/js/simple-compute-task.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/test/js/simple-compute-task.js b/modules/nodejs/src/test/js/simple-compute-task.js new file mode 100644 index 0000000..6e55c55 --- /dev/null +++ b/modules/nodejs/src/test/js/simple-compute-task.js @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +function CharacterCountTask() { +} + +CharacterCountTask.prototype.map = function(nodes, arg) { + var words = arg.split(" "); + + var map = {}; + + var nodeId = 0; + + for (var word of words) { + var node = nodes[nodeId]; + + if (nodeId < nodes.length - 1) { + nodeId++; + } + + var f = function() { + println(">>> Printing " + word); + + return word.length; + } + + map[f] = node; + } + + return map; +} + +CharacterCountTask.prototype.reduce = function(results) { + var sum = 0; + + for (var res of results) { + sum += parseInt(res, 10); + } + + return sum; +} + +exports.CharacterCountTask = CharacterCountTask http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/nodejs/src/test/js/test-compute.js ---------------------------------------------------------------------- diff --git a/modules/nodejs/src/test/js/test-compute.js b/modules/nodejs/src/test/js/test-compute.js index f7eb9b2..b8606ac 100644 --- a/modules/nodejs/src/test/js/test-compute.js +++ b/modules/nodejs/src/test/js/test-compute.js @@ -23,40 +23,89 @@ var Server = Apache.Server; var assert = require("assert"); -testCompute = function() { - TestUtils.startIgniteNode(onStart.bind(null)); +testComputeAffinityRun = function() { + TestUtils.startIgniteNode(onStart.bind(null, onPut)); } -function onStart(error, ignite) { +testComputeAffinityCall = function() { + TestUtils.startIgniteNode(onStart.bind(null, onPut1)); +} + +testComputeExecute = function() { + var CharacterCountTask = require("./simple-compute-task").CharacterCountTask + + var task = new CharacterCountTask(); + + TestUtils.startIgniteNode(onStart1.bind(null, task)); +} + +function onStart(locOnPut, error, ignite) { var cache = ignite.cache("mycache"); - var params = {"key0" : "val0"} + var params = {} - for (var i = 0; i < 1000; ++i) + for (var i = 900; i < 1000; ++i) params["key" + i] = "val" + i; - cache.putAll(params, onPut.bind(null, ignite)) - + cache.putAll(params, locOnPut.bind(null, ignite)) } function onPut(ignite, error) { var comp = ignite.compute(); var f = function () { - print("Hello world!"); + println("Hello world!"); + + ignite.hello(); } comp.affinityRun("mycache", "key999", f, onError.bind(null)); } -function onError(error, res) { +function onError(error) { + console.log("Error " + error); + + assert(error == null); + + TestUtils.testDone(); +} + +function onPut1(ignite, error) { + var comp = ignite.compute(); + + var f = function () { + println("Hello world!"); + + ignite.hello(); + } + + comp.affinityCall("mycache", "key999", f, onError1.bind(null)); +} + +function onError1(error, res) { console.log("Error " + error); assert(error == null); - assert(res.indexOf("AFFINITY RUN") !== -1); + assert(res.indexOf("AFFINITY CALL") !== -1); console.log("!!!!!!!!RES = " + res); TestUtils.testDone(); -} \ No newline at end of file +} + +function onStart1(task, error, ignite) { + var comp = ignite.compute(); + + comp.execute(task, "Hi Alice", onComputeResult); +} + +function onComputeResult(error, res) { + console.log("Error " + error); + + assert(error == null); + + console.log("!!!!!!!!EXECUTE TASK RESULT = " + res); + + TestUtils.testDone(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b42bd3d3/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java ---------------------------------------------------------------------- diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java index 8d3f018..ff5fb7e 100644 --- a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java +++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java @@ -447,8 +447,8 @@ public class GridJettyRestHandler extends AbstractHandler { break; } - case AFFINITY_RUN: { - System.out.println("!!!!!!!AFFINITY RUN"); + case AFFINITY_RUN: + case AFFINITY_CALL: { RestComputeRequest restReq0 = new RestComputeRequest(); restReq0.function((String)params.get("func")); @@ -459,6 +459,26 @@ public class GridJettyRestHandler extends AbstractHandler { break; } + case EXECUTE_TASK: { + RestComputeTaskRequest restReq0 = new RestComputeTaskRequest(); + + List<Object> funcs = values("f", params); + List<Object> nodes = values("n", params); + + assert funcs.size() == nodes.size(); + + Map<String, String> mapping = new HashMap<>(); + + for (int i = 0; i < funcs.size(); ++i) + mapping.put((String) funcs.get(i), (String)nodes.get(i)); + + restReq0.mapping(mapping); + + restReq = restReq0; + + break; + } + default: throw new IgniteCheckedException("Invalid command: " + cmd); }