# IGNITE-1121 Use 'apache-ignite' to connect to cluster via agent.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a1918517 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a1918517 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a1918517 Branch: refs/heads/ignite-1121 Commit: a1918517b172cc3975a70582e372557ca399b622 Parents: f638e73 Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Fri Jul 17 01:57:23 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Fri Jul 17 01:57:23 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/agent/Agent.java | 23 +- .../ignite/agent/messages/RestRequest.java | 34 +++ .../nodejs/agents/agent-manager.js | 294 +++++++++++++++++++ .../nodejs/agents/agent-server.js | 90 ++++++ .../nodejs/agents/agentManager.js | 210 ------------- .../web-control-center/nodejs/routes/test.js | 37 +-- 6 files changed, 449 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1918517/modules/control-center-agent/src/main/java/org/apache/ignite/agent/Agent.java ---------------------------------------------------------------------- diff --git a/modules/control-center-agent/src/main/java/org/apache/ignite/agent/Agent.java b/modules/control-center-agent/src/main/java/org/apache/ignite/agent/Agent.java index 49f45f5a..d34854d 100644 --- a/modules/control-center-agent/src/main/java/org/apache/ignite/agent/Agent.java +++ b/modules/control-center-agent/src/main/java/org/apache/ignite/agent/Agent.java @@ -22,6 +22,7 @@ import org.apache.http.*; import org.apache.http.client.entity.*; import org.apache.http.client.methods.*; import org.apache.http.client.utils.*; +import org.apache.http.entity.*; import org.apache.http.impl.client.*; import org.apache.http.message.*; import org.apache.ignite.agent.messages.*; @@ -85,17 +86,29 @@ public class Agent { builder.addParameter(entry.getKey(), entry.getValue()); } + if (restReq.getHeaders() != null) + restReq.setHeaders(restReq.getHeaders()); + if ("GET".equalsIgnoreCase(restReq.getMethod())) httpReq = new HttpGet(builder.build()); else if ("POST".equalsIgnoreCase(restReq.getMethod())) { - List<NameValuePair> nvps = builder.getQueryParams(); + HttpPost post; + + if (restReq.getBody() == null) { + List<NameValuePair> nvps = builder.getQueryParams(); - builder.clearParameters(); + builder.clearParameters(); - HttpPost post = new HttpPost(builder.build()); + post = new HttpPost(builder.build()); - if (nvps.size() > 0) - post.setEntity(new UrlEncodedFormEntity(nvps)); + if (nvps.size() > 0) + post.setEntity(new UrlEncodedFormEntity(nvps)); + } + else { + post = new HttpPost(builder.build()); + + post.setEntity(new StringEntity(restReq.getBody())); + } httpReq = post; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1918517/modules/control-center-agent/src/main/java/org/apache/ignite/agent/messages/RestRequest.java ---------------------------------------------------------------------- diff --git a/modules/control-center-agent/src/main/java/org/apache/ignite/agent/messages/RestRequest.java b/modules/control-center-agent/src/main/java/org/apache/ignite/agent/messages/RestRequest.java index ed7304b..e1f6db8 100644 --- a/modules/control-center-agent/src/main/java/org/apache/ignite/agent/messages/RestRequest.java +++ b/modules/control-center-agent/src/main/java/org/apache/ignite/agent/messages/RestRequest.java @@ -35,6 +35,12 @@ public class RestRequest extends AbstractMessage { /** */ private String method; + /** */ + private Map<String, String> headers; + + /** */ + private String body; + /** * */ @@ -90,4 +96,32 @@ public class RestRequest extends AbstractMessage { public void setMethod(String mtd) { this.method = mtd; } + + /** + * + */ + public Map<String, String> getHeaders() { + return headers; + } + + /** + * @param headers Headers. + */ + public void setHeaders(Map<String, String> headers) { + this.headers = headers; + } + + /** + * + */ + public String getBody() { + return body; + } + + /** + * @param body Body. + */ + public void setBody(String body) { + this.body = body; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1918517/modules/web-control-center/nodejs/agents/agent-manager.js ---------------------------------------------------------------------- diff --git a/modules/web-control-center/nodejs/agents/agent-manager.js b/modules/web-control-center/nodejs/agents/agent-manager.js new file mode 100644 index 0000000..42d8296 --- /dev/null +++ b/modules/web-control-center/nodejs/agents/agent-manager.js @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var WebSocketServer = require('ws').Server; + +var config = require('../helpers/configuration-loader.js'); + +var https = require('https'); + +var ignite = require('apache-ignite'); + +var db = require('../db'); + +var fs = require('fs'); + +var AgentServer = require('./agent-server').AgentServer; + +/** + * @constructor + * @param {Number} port + */ +function AgentManager(port) { + this._port = port; + + this._clients = {}; +} + +AgentManager.prototype.startup = function() { + this._server = https.createServer({ + key: fs.readFileSync(config.get('monitor:server:key')), + cert: fs.readFileSync(config.get('monitor:server:cert')), + passphrase: config.get('monitor:server:keyPassphrase') + }); + + this._server.listen(this._port); + + this._wss = new WebSocketServer({ server: this._server }); + + var self = this; + + this._wss.on('connection', function(ws) { + var client = new Client(ws, self); + }); +}; + +/** + * @param userId + * @param {Client} client + */ +AgentManager.prototype._removeClient = function(userId, client) { + var connections = this._clients[userId]; + + if (connections) { + removeFromArray(connections, client); + + if (connections.length == 0) + delete this._clients[userId]; + } +}; + +/** + * @param userId + * @param {Client} client + */ +AgentManager.prototype._addClient = function(userId, client) { + var existingConnections = this._clients[userId]; + + if (!existingConnections) { + existingConnections = []; + + this._clients[userId] = existingConnections; + } + + existingConnections.push(client); +}; + +/** + * @param userId + * @return {Client} + */ +AgentManager.prototype.findClient = function(userId) { + var clientsList = this._clients[userId]; + + if (!clientsList) + return null; + + return clientsList[0]; +}; + +/** + * For tests only!!! + * @return {Client} + */ +AgentManager.prototype.getOneClient = function() { + for (var userId in this._clients) { + if (this._clients.hasOwnProperty(userId)) { + var m = this._clients[userId]; + + if (m.length > 0) + return m[0]; + } + } + + return null; +}; + + +/** + * @constructor + * @param {AgentManager} manager + * @param {WebSocket} ws + */ +function Client(ws, manager) { + var self = this; + + this._manager = manager; + this._ws = ws; + + ws.on('close', function() { + if (self.user) { + self._manager._removeClient(self.user._id, self); + } + }); + + ws.on('message', function (msg) { + self._handleMessage(JSON.parse(msg)) + }); + + this._restCounter = 0; + + this._cbMap = {}; +} + +/** + * @param {String|Object} msg + * @param {Function} cb + */ +Client.prototype.sendMessage = function(msg, cb) { + if (typeof msg == 'object') { + msg = JSON.stringify(msg); + } + + this._ws.send(msg, cb); +}; + +/** + * @param {String} path + * @param {Object} params + * @param {Function} cb + * @param {String} method + * @param {String} body + * @param {Object} headers + */ +Client.prototype.invokeRest = function(path, params, cb, method, body, headers) { + if (typeof(params) != 'object') + throw "'params' argument must be an object"; + + if (typeof(cb) != 'function') + throw "callback must be a function"; + + if (body && typeof(body) != 'string') + throw "body must be a string"; + + if (headers && typeof(headers) != 'object') + throw "headers must be an object"; + + if (!method) + method = 'GET'; + else + method = method.toUpperCase(); + + if (method != 'GET' && method != 'POST') + throw "Unknown HTTP method: " + method; + + var reqId = this._restCounter++; + + this._cbMap[reqId] = cb; + + this.sendMessage({ + id: reqId, + type: 'RestRequest', + method: method, + params: params, + path: path, + body: body, + headers: headers + }, function(err) { + if (err) { + delete this._cbMap[reqId]; + + cb(err) + } + }) +}; + +/** + * @param {Object} msg + */ +Client.prototype._handleMessage = function(msg) { + var self = this; + + switch (msg.type) { + case 'AuthMessage': + var account = db.Account.findByUsername(msg.login, function(err, account) { + if (err) { + ws.send("{type: 'AuthResult', success: false}"); + } + else { + account.authenticate(msg.password, function(err, user, res) { + if (!user) { + self._ws.send(JSON.stringify({type: 'AuthResult', success: false, message: res.message})); + } + else { + self._ws.send("{type: 'AuthResult', success: true}"); + + self._user = account; + + self._manager._addClient(account._id, self); + + self._ignite = new ignite.Ignite(new AgentServer(self)); + } + }); + } + }); + + break; + + case 'RestResult': + var cb = this._cbMap[msg.requestId]; + + if (!cb) + break; + + delete this._cbMap[msg.requestId]; + + if (!msg.executed) { + cb(msg.message) + } + else { + cb(null, msg.code, msg.message) + } + + break; + + default: + this._ws.close() + } +}; + +/** + * @return {Ignite} + */ +Client.prototype.ignite = function() { + return this._ignite; +}; + +function removeFromArray(arr, val) { + var idx; + + while ((idx = arr.indexOf(val)) !== -1) { + arr.splice(idx, 1); + } +} + +exports.AgentManager = AgentManager; + +var manager = null; + +/** + * @return {AgentManager} + */ +exports.getOrCreate = function() { + if (!manager) { + manager = new AgentManager(config.get('monitor:server:port')); + + manager.startup(); + } + + return manager; +}; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1918517/modules/web-control-center/nodejs/agents/agent-server.js ---------------------------------------------------------------------- diff --git a/modules/web-control-center/nodejs/agents/agent-server.js b/modules/web-control-center/nodejs/agents/agent-server.js new file mode 100644 index 0000000..03e550e --- /dev/null +++ b/modules/web-control-center/nodejs/agents/agent-server.js @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Creates an instance of server for Ignite + * + * @constructor + * @this {AgentServer} + * @param {Client} client connected client + */ +function AgentServer(client) { + this._client = client; +} + +/** + * Run http request + * + * @this {AgentServer} + * @param {Command} cmd Command + * @param {callback} callback on finish + */ +AgentServer.prototype.runCommand = function(cmd, callback) { + var params = {cmd: cmd.name()}; + + for (var p of cmd._params) { + params[p.key] = p.value; + } + + var body = undefined; + + var headers = undefined; + + if (cmd._isPost()) { + body = cmd.postData(); + + headers = {'Content-Length': body.length, 'JSONObject': 'true'}; + } + + this._client.invokeRest("ignite", params, function(error, code, message) { + if (error) { + callback(error); + return + } + + if (code !== 200) { + if (code === 401) { + callback.call(null, "Authentication failed. Status code 401."); + } + else { + callback.call(null, "Request failed. Status code " + code); + } + + return; + } + + var igniteResponse; + + try { + igniteResponse = JSON.parse(message); + } + catch (e) { + callback.call(null, e, null); + + return; + } + + if (igniteResponse.successStatus) { + callback.call(null, igniteResponse.error, null) + } + else { + callback.call(null, null, igniteResponse.response); + } + }, cmd._method(), body, headers); +}; + +exports.AgentServer = AgentServer; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1918517/modules/web-control-center/nodejs/agents/agentManager.js ---------------------------------------------------------------------- diff --git a/modules/web-control-center/nodejs/agents/agentManager.js b/modules/web-control-center/nodejs/agents/agentManager.js deleted file mode 100644 index 1050a47..0000000 --- a/modules/web-control-center/nodejs/agents/agentManager.js +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -var WebSocketServer = require('ws').Server; - -var config = require('../helpers/configuration-loader.js'); - -var https = require('https'); - -var db = require('../db'); - -var fs = require('fs'); - -var srv; - -var clients = {}; - -function Client(ws) { - var self = this; - - this.ws = ws; - - ws.on('close', function() { - if (self.userId) { - var connections = clients[self.userId]; - - if (connections) { - removeFromArray(connections, self); - } - } - }); - - ws.on('message', function (msg) { - var m = JSON.parse(msg); - - switch (m.type) { - case 'AuthMessage': - var account = db.Account.findByUsername(m.login, function(err, account) { - if (err) { - ws.send("{type: 'AuthResult', success: false}"); - } - else { - account.authenticate(m.password, function(err, user, res) { - if (!user) { - ws.send(JSON.stringify({type: 'AuthResult', success: false, message: res.message})); - } - else { - ws.send("{type: 'AuthResult', success: true}"); - - self.userId = account._id; - - var existingConnections = clients[account._id]; - - if (!existingConnections) { - existingConnections = []; - - clients[account._id] = existingConnections; - } - - existingConnections.push(self); - } - }); - } - }); - - break; - - case 'RestResult': - var cb = self.cbMap[m.requestId]; - - if (!cb) - break; - - delete self.cbMap[m.requestId]; - - if (!m.executed) { - cb("Failed to execute RESQ query: " + m.message) - } - else { - cb(null, m.code, m.message) - } - - break; - - default: - ws.close() - } - }); - - this.sendMessage = function(msg, cb) { - if (typeof(msg) == 'object') { - msg = JSON.stringify(msg); - } - - ws.send(msg, cb); - }; - - this.restCounter = 0; - - this.cbMap = {}; - - this.invokeRest = function(path, params, cb, method) { - if (typeof(params) != 'object') - throw "'params' argument must be an object"; - - if (typeof(cb) != 'function') - throw "callback must be a function"; - - if (!method) - method = 'GET'; - else - method = method.toUpperCase(); - - if (method != 'GET' && method != 'POST') - throw "Unknown HTTP method: " + method; - - var reqId = this.restCounter++; - - this.cbMap[reqId] = cb; - - this.sendMessage({ - id: reqId, - type: 'RestRequest', - method: method, - params: params, - path: path - }, function(err) { - if (err) { - delete this.cbMap[reqId]; - - cb(err) - } - }) - }; - - this.restGet = function(path, params, cb) { - this.invokeRest(path, params, cb, 'GET'); - }; - - this.restPost = function(path, params, cb) { - this.invokeRest(path, params, cb, 'POST'); - } -} - -function Server() { - var server = https.createServer({ - key: fs.readFileSync(config.get('monitor:server:key')), - cert: fs.readFileSync(config.get('monitor:server:cert')), - passphrase: config.get('monitor:server:keyPassphrase') - }); - - server.listen(config.get('monitor:server:port')); - - var wss = new WebSocketServer({ server: server }); - - wss.on('connection', function(ws) { - var client = new Client(ws); - }) -} - -function removeFromArray(arr, val) { - var idx; - - while ((idx = arr.indexOf(val)) !== -1) { - arr.splice(idx, 1); - } -} - -exports.startServer = function() { - srv = new Server(); -}; - -exports.findClient = function(userId) { - var clientsList = clients[userId]; - - if (!clientsList) - return null; - - return clientsList[0]; -}; - -/** - * For tests only!!! - */ -exports.getOneClient = function() { - for (var userId in clients) { - if (clients.hasOwnProperty(userId)) { - var m = clients[userId]; - - if (m.length > 0) - return m[0]; - } - } - - return null; -}; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1918517/modules/web-control-center/nodejs/routes/test.js ---------------------------------------------------------------------- diff --git a/modules/web-control-center/nodejs/routes/test.js b/modules/web-control-center/nodejs/routes/test.js index 4c066ca..b5bdd82 100644 --- a/modules/web-control-center/nodejs/routes/test.js +++ b/modules/web-control-center/nodejs/routes/test.js @@ -16,45 +16,34 @@ */ var router = require('express').Router(); -var bridge = require('../agents/agentManager'); +var agentManager = require('../agents/agent-manager'); /* GET summary page. */ -router.get('/testGet', function(req, res) { - var c = bridge.getOneClient(); +router.get('/', function(req, res) { + var c = agentManager.getOrCreate().getOneClient(); if (!c) { return res.send("Client not found"); } - c.restGet("ignite", {cmd: 'version'}, function(error, code, message) { - if (error) { - res.send("Failed to execute REST query: " + error); + var html = ""; - return - } + var ignite = c.ignite(); - res.send("code: " + code + '<br>message: ' + message); - }); -}); - -/* GET summary page. */ -router.get('/testPost', function(req, res) { - var c = bridge.getOneClient(); - - if (!c) { - return res.send("Client not found"); - } + ignite.version().then(function (ver) { + html += "version: " + ver + "<br>"; - c.restPost("ignite", {cmd: 'version'}, function(error, code, message) { - if (error) { - res.send("Failed to execute REST query: " + error); + return ignite.cluster() + }).then(function (cluster) { + html += "cluster size: " + cluster.length + "<br>"; - return + for (var i = 0; i < cluster.length; i++) { + html += "#" + cluster[i].nodeId(); } - res.send("code: " + code + '<br>message: ' + message); + res.send(html); }); });