diff --git a/meshagent.js b/meshagent.js index 15b86ce5..e6ac32c6 100644 --- a/meshagent.js +++ b/meshagent.js @@ -62,7 +62,11 @@ module.exports.CreateMeshAgent = function (parent, db, ws, req, args, domain) { // Other clean up may be needed here if (obj.unauth) { delete obj.unauth; } - if (obj.agentUpdate != null) { obj.fs.close(obj.agentUpdate.fd); obj.agentUpdate = null; } + if (obj.agentUpdate != null) { + obj.fs.close(obj.agentUpdate.fd); + obj.parent.parent.taskLimiter.completed(obj.agentUpdate.taskid); // Indicate this task complete + obj.agentUpdate = null; + } if (((obj.agentInfo) && (obj.agentInfo.capabilities) && (obj.agentInfo.capabilities & 0x20)) || ((mesh) && (mesh.flags) && (mesh.flags & 1))) { // This is a temporary agent, remote it // Delete this node including network interface information and events obj.db.Remove(obj.dbNodeKey); // Remove node with that id @@ -163,38 +167,42 @@ module.exports.CreateMeshAgent = function (parent, db, ws, req, args, domain) { if ((msg.length == 52) && (obj.agentExeInfo != null) && (obj.agentExeInfo.update == true)) { var agenthash = obj.common.rstr2hex(msg.substring(4)).toLowerCase(); if ((agenthash != obj.agentExeInfo.hash) && (agenthash != '000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000')) { - // Mesh agent update required - if (obj.nodeid != null) { obj.parent.parent.debug(1, 'Agent update required, NodeID=0x' + obj.nodeid.substring(0, 16) + ', ' + obj.agentExeInfo.desc); } - obj.fs.open(obj.agentExeInfo.path, 'r', function (err, fd) { - if (err) { return console.error(err); } - obj.agentUpdate = { oldHash: agenthash, ptr: 0, buf: Buffer.alloc(agentUpdateBlockSize + 4), fd: fd }; + // Mesh agent update required, do it using task limiter so not to flood the network. + obj.parent.parent.taskLimiter.launch(function (argument, taskid, taskLimiterQueue) { + if (obj.nodeid != null) { obj.parent.parent.debug(1, 'Agent update required, NodeID=0x' + obj.nodeid.substring(0, 16) + ', ' + obj.agentExeInfo.desc); } + obj.fs.open(obj.agentExeInfo.path, 'r', function (err, fd) { + if (err) { return console.error(err); } + obj.agentUpdate = { oldHash: agenthash, ptr: 0, buf: Buffer.alloc(agentUpdateBlockSize + 4), fd: fd, taskid: taskid }; - // MeshCommand_CoreModule, ask mesh agent to clear the core. - // The new core will only be sent after the agent updates. - obj.send(obj.common.ShortToStr(10) + obj.common.ShortToStr(0)); + // MeshCommand_CoreModule, ask mesh agent to clear the core. + // The new core will only be sent after the agent updates. + obj.send(obj.common.ShortToStr(10) + obj.common.ShortToStr(0)); - // We got the agent file open on the server side, tell the agent we are sending an update starting with the SHA384 hash of the result - //console.log("Agent update file open."); - obj.send(obj.common.ShortToStr(13) + obj.common.ShortToStr(0)); // Command 13, start mesh agent download + // We got the agent file open on the server side, tell the agent we are sending an update starting with the SHA384 hash of the result + //console.log("Agent update file open."); + obj.send(obj.common.ShortToStr(13) + obj.common.ShortToStr(0)); // Command 13, start mesh agent download + + // Send the first mesh agent update data block + obj.agentUpdate.buf[0] = 0; + obj.agentUpdate.buf[1] = 14; + obj.agentUpdate.buf[2] = 0; + obj.agentUpdate.buf[3] = 1; + var len = -1; + try { len = obj.fs.readSync(obj.agentUpdate.fd, obj.agentUpdate.buf, 4, agentUpdateBlockSize, obj.agentUpdate.ptr); } catch (e) { } + if (len == -1) { + // Error reading the agent file, stop here. + obj.fs.close(obj.agentUpdate.fd); + obj.parent.parent.taskLimiter.completed(obj.agentUpdate.taskid); // Indicate this task complete + obj.agentUpdate = null; + } else { + // Send the first block to the agent + obj.agentUpdate.ptr += len; + //console.log("Agent update send first block: " + len); + obj.send(obj.agentUpdate.buf); // Command 14, mesh agent first data block + } + }); + }, null); - // Send the first mesh agent update data block - obj.agentUpdate.buf[0] = 0; - obj.agentUpdate.buf[1] = 14; - obj.agentUpdate.buf[2] = 0; - obj.agentUpdate.buf[3] = 1; - var len = -1; - try { len = obj.fs.readSync(obj.agentUpdate.fd, obj.agentUpdate.buf, 4, agentUpdateBlockSize, obj.agentUpdate.ptr); } catch (e) { } - if (len == -1) { - // Error reading the agent file, stop here. - obj.fs.close(obj.agentUpdate.fd); - obj.agentUpdate = null; - } else { - // Send the first block to the agent - obj.agentUpdate.ptr += len; - //console.log("Agent update send first block: " + len); - obj.send(obj.agentUpdate.buf); // Command 14, mesh agent first data block - } - }); } else { // Check the mesh core, if the agent is capable of running one if (((obj.agentInfo.capabilities & 16) != 0) && (obj.parent.parent.meshAgentsArchitectureNumbers[obj.agentInfo.agentId].core != null)) { @@ -212,6 +220,7 @@ module.exports.CreateMeshAgent = function (parent, db, ws, req, args, domain) { if (len == -1) { // Error reading the agent file, stop here. obj.fs.close(obj.agentUpdate.fd); + obj.parent.parent.taskLimiter.completed(obj.agentUpdate.taskid); // Indicate this task complete obj.agentUpdate = null; } else { // Send the next block to the agent @@ -223,6 +232,7 @@ module.exports.CreateMeshAgent = function (parent, db, ws, req, args, domain) { //console.log("Agent update sent"); obj.send(obj.common.ShortToStr(13) + obj.common.ShortToStr(0) + obj.common.hex2rstr(obj.agentExeInfo.hash)); // Command 13, end mesh agent download, send agent SHA384 hash obj.fs.close(obj.agentUpdate.fd); + obj.parent.parent.taskLimiter.completed(obj.agentUpdate.taskid); // Indicate this task complete obj.agentUpdate = null; } } diff --git a/swarmserver.js b/swarmserver.js index 68a1b9ac..1613dd0a 100644 --- a/swarmserver.js +++ b/swarmserver.js @@ -214,7 +214,12 @@ module.exports.CreateSwarmServer = function (parent, db, args, certificates) { socket.tag.update = obj.migrationAgents[nodeblock.agenttype][nextAgentVersion]; socket.tag.updatePtr = 0; //console.log('Performing legacy agent update from ' + nodeblock.agentversion + '.' + nodeblock.agenttype + ' to ' + socket.tag.update.ver + '.' + socket.tag.update.arch + ' on ' + nodeblock.agentname + '.'); - obj.SendCommand(socket, LegacyMeshProtocol.GETSTATE, common.IntToStr(5) + common.IntToStr(0)); // agent.SendQuery(5, 0); // Start the agent download + + // Start the agent download using the task limiter so not to flood the server. + obj.parent.taskLimiter.launch(function (socket, taskid, taskLimiterQueue) { + socket.tag.taskid = taskid; + obj.SendCommand(socket, LegacyMeshProtocol.GETSTATE, common.IntToStr(5) + common.IntToStr(0)); // agent.SendQuery(5, 0); // Start the agent download + }, socket); } else { //console.log('No legacy agent update for ' + nodeblock.agentversion + '.' + nodeblock.agenttype + ' on ' + nodeblock.agentname + '.'); } @@ -248,6 +253,8 @@ module.exports.CreateSwarmServer = function (parent, db, args, certificates) { // Send end-of-transfer obj.SendCommand(socket, LegacyMeshProtocol.GETSTATE, common.IntToStr(7) + common.IntToStr(socket.tag.update.binary.length)); //agent.SendQuery(7, AgentFileLen); Debug(3, 'Swarm:Sending end of agent, ptr = ' + socket.tag.updatePtr); + obj.parent.taskLimiter.completed(socket.tag.taskid); // Indicate this task complete + delete socket.tag.taskid; delete socket.tag.update; delete socket.tag.updatePtr; } @@ -274,9 +281,11 @@ module.exports.CreateSwarmServer = function (parent, db, args, certificates) { socket.addListener("close", function () { Debug(1, 'Swarm:Connection closed'); - try { delete obj.ciraConnections[socket.tag.nodeid]; } catch (e) { } - obj.parent.ClearConnectivityState(socket.tag.meshid, socket.tag.nodeid, 2); if (socket.pingTimer != null) { clearInterval(socket.pingTimer); delete socket.pingTimer; } + if (socket.tag && (typeof socket.tag.taskid == 'number')) { + obj.parent.taskLimiter.completed(socket.tag.taskid); // Indicate this task complete + delete socket.tag.taskid; + } }); socket.addListener("error", function () { @@ -344,8 +353,6 @@ module.exports.CreateSwarmServer = function (parent, db, args, certificates) { // Disconnect legacy agent connection obj.close = function (socket) { try { socket.close(); } catch (e) { } - try { delete obj.ciraConnections[socket.tag.nodeid]; } catch (e) { } - obj.parent.ClearConnectivityState(socket.tag.meshid, socket.tag.nodeid, 2); }; obj.SendCommand = function (socket, cmdid, data) {