Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-965 01c9c3acb -> 47276a81e


#ignite-965: map and reduce are executed on server.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d481ea8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d481ea8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d481ea8c

Branch: refs/heads/ignite-965
Commit: d481ea8c76f9de8e0e94e6a2ee7a46d3788c1442
Parents: 01c9c3a
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Tue Jun 23 18:08:27 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Tue Jun 23 18:08:27 2015 +0300

----------------------------------------------------------------------
 .../IgniteComputeTaskCommandHandler.java        | 102 ++++++++++++++-----
 .../rest/request/RestComputeTaskRequest.java    |  38 +++++--
 .../scripting/IgniteScriptProcessor.java        |  22 +++-
 modules/nodejs/src/main/js/apache-ignite.js     |   3 +-
 modules/nodejs/src/main/js/compute-task.js      |  38 -------
 modules/nodejs/src/main/js/compute.js           |  75 ++------------
 modules/nodejs/src/test/js/test-compute.js      |  37 ++++---
 modules/nodejs/src/test/js/test-utils.js        |  10 +-
 .../http/jetty/GridJettyRestHandler.java        |  17 +---
 9 files changed, 161 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java
index a3708ab..7329a5b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/compute/IgniteComputeTaskCommandHandler.java
@@ -25,12 +25,11 @@ import org.apache.ignite.internal.processors.rest.*;
 import org.apache.ignite.internal.processors.rest.handlers.*;
 import org.apache.ignite.internal.processors.rest.request.*;
 import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.resources.*;
 import org.jetbrains.annotations.*;
 
-import javax.script.ScriptException;
+import javax.script.*;
 import java.util.*;
 
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.*;
@@ -64,25 +63,28 @@ public class IgniteComputeTaskCommandHandler extends 
GridRestCommandHandlerAdapt
 
         final RestComputeTaskRequest req0 = (RestComputeTaskRequest) req;
 
-        List<T3<String, String, String>> mapping =  req0.mapping();
-
-        Object res = ctx.grid().compute().execute(new JsTask(mapping, ctx), 
null);
+        Object res = ctx.grid().compute().execute(new JsTask(req0.mapFunc(), 
req0.argument(), req0.reduceFunc(), ctx), null);
 
         return new GridFinishedFuture<>(new GridRestResponse(res));
     }
 
     private static class JsTask extends ComputeTaskAdapter<String, Object> {
-        /** Mapping. */
-        private List<T3<String, String, String>> mapping;
+        /** Mapping function. */
+        private String mapFunc;
+
+        private String reduceFunc;
 
         /** Grid kernal context. */
         private GridKernalContext ctx;
 
+        private String arg;
+
         /**
-         * @param mapping Task mapping.
          */
-        public JsTask(List<T3<String, String, String>> mapping, 
GridKernalContext ctx) {
-            this.mapping = mapping;
+        public JsTask(String mapFunc, String arg, String reduceFunc, 
GridKernalContext ctx) {
+            this.mapFunc = mapFunc;
+            this.reduceFunc = reduceFunc;
+            this.arg = arg;
             this.ctx = ctx;
         }
 
@@ -90,27 +92,68 @@ public class IgniteComputeTaskCommandHandler extends 
GridRestCommandHandlerAdapt
         @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> nodes, String arg) {
             Map<ComputeJob, ClusterNode> map = new HashMap<>();
 
-            for (final T3<String, String, String> job : mapping) {
-                UUID nodeId = UUID.fromString(job.get2());
+            String nodesIds = "[";
 
-                ClusterNode node = ctx.grid().cluster().node(nodeId);
+            for (ClusterNode node : nodes)
+                nodesIds += "\""  + node.id().toString() + "\"" + ",";
 
-                map.put(new ComputeJobAdapter() {
-                    /** Ignite. */
-                    @IgniteInstanceResource
-                    private Ignite ignite;
+            nodesIds = nodesIds.substring(0, nodesIds.length() - 1) + "]";
 
-                    @Override public Object execute() throws IgniteException {
-                        System.out.println("Compute job on node " + 
ignite.cluster().localNode().id());
+            try {
+                String newMap = new String("function () {\n" +
+                    "   var res = [];\n" +
+                    "   var resCont = function(f, args, nodeId) {\n" +
+                    "       res.push([f.toString(), args, nodeId])\n" +
+                    "   }\n" +
+                    "   var locF = " + mapFunc + "; \n locF(" +
+                        nodesIds + ", " +
+                    "\"" + this.arg + "\"" +
+                    ", resCont.bind(null)" + ");\n" +
+                    "   return res;\n" +
+                    "}");
 
-                        try {
-                            return 
((IgniteKernal)ignite).context().scripting().runJS(job.get1(), job.get3());
-                        }
-                        catch (ScriptException e) {
-                            throw new IgniteException(e);
+                List mapRes = (List)ctx.scripting().runJS(newMap);
+
+                for (Object arr : mapRes) {
+                    Object[] nodeTask = ((List)arr).toArray();
+
+                    final String func = (String)nodeTask[0];
+
+                    final List argv = (List) nodeTask[1];
+
+                    String nodeIdStr = (String) nodeTask[2];
+
+                    UUID nodeId = UUID.fromString(nodeIdStr);
+
+                    ClusterNode node = ctx.grid().cluster().node(nodeId);
+
+                    map.put(new ComputeJobAdapter() {
+                        /** Ignite. */
+                        @IgniteInstanceResource
+                        private Ignite ignite;
+
+                        @Override public Object execute() throws 
IgniteException {
+                            System.out.println("Compute job on node " + 
ignite.cluster().localNode().id());
+                            try {
+                                String[] argv1 = new String[argv.size()];
+
+                                for (int i = 0; i < argv1.length; ++i)
+                                    argv1[i] = "\"" + argv.get(i).toString() + 
"\"";
+
+                                return ctx.scripting().runJS(func, argv1);
+                            }
+                            catch (Exception e) {
+                                throw new IgniteException(e);
+                            }
                         }
-                    }
-                }, node);
+                    }, node);
+
+                }
+            }
+            catch (ScriptException e) {
+                throw new IgniteException(e);
+            }
+            finally {
             }
 
             return map;
@@ -123,7 +166,12 @@ public class IgniteComputeTaskCommandHandler extends 
GridRestCommandHandlerAdapt
             for (ComputeJobResult res : results)
                 data.add(res.getData());
 
-            return data;
+            try {
+                return ctx.scripting().runJS(reduceFunc, new String[] 
{data.toString()});
+            }
+            catch (ScriptException e) {
+                throw new IgniteException(e);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java
index bae7ce5..89e04da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestComputeTaskRequest.java
@@ -26,19 +26,35 @@ import java.util.*;
  */
 public class RestComputeTaskRequest extends GridRestRequest {
     /** Mapping tasks to nodes. */
-    private List<T3<String, String, String>> mapping;
+    private String mapFunc;
 
-    /**
-     * @param mapping Mapping tasks to nodes.
-     */
-    public void mapping(List<T3<String, String, String>> mapping) {
-        this.mapping = mapping;
+    /** Function argument. */
+    private String arg;
+
+    private String reduceFunc;
+
+    public void reduceFunc(String reduceFunc) {
+        this.reduceFunc = reduceFunc;
+    }
+
+    public String reduceFunc() {
+
+        return reduceFunc;
+    }
+
+    public String mapFunc() {
+        return mapFunc;
+    }
+
+    public String argument() {
+        return arg;
+    }
+
+    public void mapFunc(String mapFunc) {
+        this.mapFunc = mapFunc;
     }
 
-    /**
-     * @return Mapping tasks to nodes.
-     */
-    public List<T3<String, String, String>> mapping() {
-        return mapping;
+    public void argument(String arg) {
+        this.arg = arg;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java
index 54004de..99c206e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/scripting/IgniteScriptProcessor.java
@@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.scripting;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.typedef.*;
 
 import javax.script.*;
+import java.util.*;
 
 /**
  * Ignite scripting manager.
@@ -46,7 +48,7 @@ public class IgniteScriptProcessor extends 
GridProcessorAdapter {
      * @throws ScriptException If script failed.
      */
     public Object run(String engName, String script) throws ScriptException {
-        if (engName.equals(JAVA_SCRIPT_ENGINE_NAME))
+        if (!engName.equals(JAVA_SCRIPT_ENGINE_NAME))
             throw new IgniteException("Engine is not supported. [engName=" + 
engName + "]");
 
         return runJS(script);
@@ -54,9 +56,11 @@ public class IgniteScriptProcessor extends 
GridProcessorAdapter {
 
     /**
      * @param script Script.
+     * @param args Arguments.
+     * @return Script result.
      * @throws ScriptException If script failed.
      */
-    public Object runJS(String script, String args) throws ScriptException {
+    public Object runJS(String script, String[] args) throws ScriptException {
         ScriptEngine engine = factory.getEngineByName("JavaScript");
 
         Bindings b = engine.createBindings();
@@ -65,13 +69,23 @@ public class IgniteScriptProcessor extends 
GridProcessorAdapter {
 
         engine.setBindings(b, ScriptContext.ENGINE_SCOPE);
 
-        script = "(" + script + ")(" + args + ");";
+        script = "(" + script + ")(" ;
+
+        for (int i = 0; i < args.length; ++i)
+            script += args[i] + (i < args.length - 1 ? "," : "");
+
+        script += ");";
 
         return engine.eval(script);
     }
 
+    /**
+     * @param script Script.
+     * @return Script result.
+     * @throws ScriptException If script failed.
+     */
     public Object runJS(String script) throws ScriptException {
-        return runJS(script, "");
+        return runJS(script, new String[]{""});
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/main/js/apache-ignite.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/apache-ignite.js 
b/modules/nodejs/src/main/js/apache-ignite.js
index f90cf68..0df6160 100644
--- a/modules/nodejs/src/main/js/apache-ignite.js
+++ b/modules/nodejs/src/main/js/apache-ignite.js
@@ -20,6 +20,5 @@ module.exports = {
   Ignition : require('./ignition.js').Ignition,
   Server : require('./server.js').Server,
   Ignite : require('./ignite.js').Ignite,
-  Compute : require('./compute.js').Compute,
-  ComputeJob: require('./compute.js').ComputeJob
+  Compute : require('./compute.js').Compute
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/main/js/compute-task.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/compute-task.js 
b/modules/nodejs/src/main/js/compute-task.js
deleted file mode 100644
index d13f361..0000000
--- a/modules/nodejs/src/main/js/compute-task.js
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * @constructor
- * @this {ComputeTask}
- */
-function ComputeTask() {
-}
-
-/**
- * @param {string[]} nodes Nodes id
- * @param {string} arg Argument
- * @returns {Object.<string, Cache~onGet>} Map of grid jobs assigned to 
subgrid node. Unless {@link ComputeTaskContinuousMapper} is
- * injected into task, if {@code null} or empty map is returned, exception 
will be thrown
- */
-ComputeTask.prototype.map = function(nodes, arg) {
-}
-
-/**
- * @param {string[]} results Results
- */
-ComputeTask.prototype.reduce = function(results) {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/main/js/compute.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/main/js/compute.js 
b/modules/nodejs/src/main/js/compute.js
index fb38e7a..7a5f808 100644
--- a/modules/nodejs/src/main/js/compute.js
+++ b/modules/nodejs/src/main/js/compute.js
@@ -16,7 +16,6 @@
  */
 
 var Server = require("./server").Server;
-var ComputeTask = require("./compute-task").ComputeTask;
 
 /**
  * @constructor
@@ -72,76 +71,14 @@ Compute.prototype._escape = function(f) {
  * @param {string} arg  Argument
  * @param {} callback Callback
  */
-Compute.prototype.execute = function(task, arg, callback) {
-  this._nodes(this._onNodesExecute.bind(this, task, arg, callback));
-}
-
-Compute.prototype._nodes = function(callback) {
-  this._server.runCommand("top", [Server.pair("mtr", "false"), 
Server.pair("attr", "false")],
-    this._onNodes.bind(this, callback))
-}
-
-Compute.prototype._onNodes = function(callback, error, results) {
-  if (error) {
-    callback.call(null, error, null);
-
-    return;
-  }
-
-  var nodes = [];
-
-  for (var res of results) {
-    nodes.push(res["nodeId"])
-  }
-
-  callback.call(null, null, nodes);
-}
-
-Compute.prototype._onNodesExecute = function(task, arg, callback, err, nodes) {
-  if (err) {
-      callback.call(null, error, null);
-
-      return;
-  }
-
-  var computeJobList = task.map(nodes, arg);
+Compute.prototype.execute = function(map, reduce, arg, callback) {
+   var params = [];
 
-  var params = [];
-  var i = 1;
+    params.push(Server.pair("map", this._escape(map)));
+    params.push(Server.pair("reduce", this._escape(reduce)));
+    params.push(Server.pair("arg", this._escape(arg)));
 
-  console.log("TASK" + computeJobList);
-  for (var job of computeJobList) {
-    params.push(Server.pair("f" + i, this._escape(job.func)));
-    params.push(Server.pair("args" + i,  JSON.stringify(job.args)));
-    params.push(Server.pair("n" + i, job.node));
-    i++;
-  }
-
-  this._server.runCommand("exectask", params, this._onResExecute.bind(this, 
task, callback));
-}
-
-
-Compute.prototype._onResExecute = function(task, callback, err, results) {
-  if (err) {
-    callback.call(null, err, null);
-
-    return;
-  }
-
-  console.log("ON RES EXEC = " + results);
-
-  var res = task.reduce(results);
-
-  callback.call(null, null, res);
+    this._server.runCommand("exectask", params, callback);
 }
 
 exports.Compute = Compute
-
-
-function ComputeJob(func, args, node) {
-    this.func = func;
-    this.args = args;
-    this.node = node;
-}
-
-exports.ComputeJob = ComputeJob;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/test/js/test-compute.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/test/js/test-compute.js 
b/modules/nodejs/src/test/js/test-compute.js
index 87a284c..c444380 100644
--- a/modules/nodejs/src/test/js/test-compute.js
+++ b/modules/nodejs/src/test/js/test-compute.js
@@ -17,10 +17,6 @@
 
 var TestUtils = require("./test-utils").TestUtils;
 
-var Apache = require(TestUtils.scriptPath());
-var Cache = Apache.Cache;
-var Server = Apache.Server;
-
 var assert = require("assert");
 
 testComputeAffinityRun = function() {
@@ -32,11 +28,7 @@ testComputeAffinityCall = function() {
 }
 
 testComputeExecute = function() {
-  var CharacterCountTask = require("./simple-compute-task").CharacterCountTask
-
-  var task = new CharacterCountTask();
-
-  TestUtils.startIgniteNode(onStart1.bind(null, task));
+  TestUtils.startIgniteNode(onStart1);
 }
 
 function onStart(locOnPut, error, ignite) {
@@ -92,10 +84,31 @@ function onError1(error, res) {
   TestUtils.testDone();
 }
 
-function onStart1(task, error, ignite) {
-  var comp = ignite.compute();
+function onStart1(error, ignite) {
+  var map = function(nodes, arg, emit) {
+    var words = arg.split(" ");
+
+    for (var i = 0; i < words.length; i++) {
+      var f = function (word) {
+        println(">>> Printing " + word);
+
+        return word.length;
+      };
+
+      emit(f, [words[i]], nodes[i %  nodes.length]);
+    }
+  };
+
+  var reduce = function(results) {
+    var sum = 0;
+
+    for (var i = 0; i < results.length; ++i)
+     sum += parseInt(results[i], 10);
+
+    return sum;
+  };
 
-  comp.execute(task, "Hi Alice", onComputeResult);
+  ignite.compute().execute(map, reduce, "Hi Alice", onComputeResult);
 }
 
 function onComputeResult(error, res) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/nodejs/src/test/js/test-utils.js
----------------------------------------------------------------------
diff --git a/modules/nodejs/src/test/js/test-utils.js 
b/modules/nodejs/src/test/js/test-utils.js
index 135da8c..ee83d7f 100644
--- a/modules/nodejs/src/test/js/test-utils.js
+++ b/modules/nodejs/src/test/js/test-utils.js
@@ -129,8 +129,9 @@ TestUtils.testDone = function() {
  * @param {Ignition~onStart} callback Called on connect
  */
 TestUtils.startIgniteNode = function(callback) {
-  var Apache = require(TestUtils.scriptPath());
-  var Ignition = Apache.Ignition;
+  var Ignite = require(TestUtils.scriptPath());
+  var Ignition = Ignite.Ignition;
+
   Ignition.start(['127.0.0.1:9095'], null, callback);
 }
 
@@ -141,8 +142,9 @@ TestUtils.startIgniteNode = function(callback) {
  * @param {Ignition~onStart} callback Called on connect
  */
 TestUtils.startIgniteNodeWithKey = function(secretKey, callback) {
-  var Apache = require(TestUtils.scriptPath());
-  var Ignition = Apache.Ignition;
+  var Ignite = require(TestUtils.scriptPath());
+  var Ignition = Ignite.Ignition;
+
   Ignition.start(['127.0.0.1:9095'], secretKey, callback);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d481ea8c/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
 
b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
index 5fe4cd9..7d71530 100644
--- 
a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
+++ 
b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java
@@ -462,20 +462,9 @@ public class GridJettyRestHandler extends AbstractHandler {
             case EXECUTE_TASK: {
                 RestComputeTaskRequest restReq0 = new RestComputeTaskRequest();
 
-                List<Object> funcs = values("f", params);
-                List<Object> nodes = values("n", params);
-
-                List<Object> args = values("args", params);
-
-                assert funcs.size() == nodes.size();
-
-                List<T3<String, String, String>> mapping = new ArrayList<>();
-
-
-                for (int i = 0; i < funcs.size(); ++i)
-                    mapping.add(new T3((String) funcs.get(i), 
(String)nodes.get(i), (String)args.get(i)));
-
-                restReq0.mapping(mapping);
+                restReq0.mapFunc((String)params.get("map"));
+                restReq0.argument((String) params.get("arg"));
+                restReq0.reduceFunc((String) params.get("reduce"));
 
                 restReq = restReq0;
 

Reply via email to