More MQTT improvements

This commit is contained in:
Ylian Saint-Hilaire 2019-10-04 17:24:30 -07:00
parent 7bba856984
commit 9fbe211cad
9 changed files with 99 additions and 32 deletions

View File

@ -826,7 +826,7 @@ function CreateMeshCentralServer(config, args) {
obj.apfserver = require('./apfserver.js').CreateApfServer(obj, obj.db, obj.args);
// Create MQTT Broker to hook into webserver and mpsserver
if (obj.config.mqtt != null) { obj.mqttbroker = require("./mqttbroker.js").CreateMQTTBroker(obj, obj.db, obj.args); }
if (obj.config.settings.mqtt != null) { obj.mqttbroker = require("./mqttbroker.js").CreateMQTTBroker(obj, obj.db, obj.args); }
// Start the web server and if needed, the redirection web server.
obj.webserver = require('./webserver.js').CreateWebServer(obj, obj.db, obj.args, obj.certificates);
@ -1100,9 +1100,9 @@ function CreateMeshCentralServer(config, args) {
// meshId: mesh identifier of format mesh/domain/meshidhex
// nodeId: node identifier of format node/domain/nodeidhex
// connectTime: time of connection, milliseconds elapsed since the UNIX epoch.
// connectType: Bitmask, 1 = MeshAgent, 2 = Intel AMT CIRA, 4 = Intel AMT local.
// connectType: Bitmask, 1 = MeshAgent, 2 = Intel AMT CIRA, 4 = Intel AMT local, 8 = Intel AMT Relay, 16 = MQTT
// powerState: Value, 0 = Unknown, 1 = S0 power on, 2 = S1 Sleep, 3 = S2 Sleep, 4 = S3 Sleep, 5 = S4 Hibernate, 6 = S5 Soft-Off, 7 = Present
//var connectTypeStrings = ['', 'MeshAgent', 'Intel AMT CIRA', '', 'Intel AMT local'];
//var connectTypeStrings = ['', 'MeshAgent', 'Intel AMT CIRA', '', 'Intel AMT local', '', '', '', 'Intel AMT Relay', '', '', '', '', '', '', '', 'MQTT'];
//var powerStateStrings = ['Unknown', 'Powered', 'Sleep', 'Sleep', 'Deep Sleep', 'Hibernating', 'Soft-Off', 'Present'];
obj.SetConnectivityState = function (meshid, nodeid, connectTime, connectType, powerState, serverid) {
//console.log('SetConnectivity for ' + nodeid.substring(0, 16) + ', Type: ' + connectTypeStrings[connectType] + ', Power: ' + powerStateStrings[powerState] + (serverid == null ? ('') : (', ServerId: ' + serverid)));
@ -1829,7 +1829,7 @@ function mainStart() {
if (require('os').platform() == 'win32') { modules.push('node-windows'); if (sspi == true) { modules.push('node-sspi'); } } // Add Windows modules
if (ldap == true) { modules.push('ldapauth-fork'); }
if (config.letsencrypt != null) { modules.push('greenlock'); modules.push('le-store-certbot'); modules.push('le-challenge-fs'); modules.push('le-acme-core'); } // Add Greenlock Modules
if (config.mqtt != null) { modules.push('mqtt'); modules.push('aedes'); } // Add MQTT Modules
if (config.settings.mqtt != null) { modules.push('aedes'); } // Add MQTT Modules
if (config.settings.mongodb != null) { modules.push('mongodb'); } // Add MongoDB, official driver.
else if (config.settings.xmongodb != null) { modules.push('mongojs'); } // Add MongoJS, old driver.
if (config.smtp != null) { modules.push('nodemailer'); } // Add SMTP support

View File

@ -238,9 +238,11 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) {
socket.removeAllListeners("close");
socket.setNoDelay(true);
socket.serialtunnel = SerialTunnel();
socket.on('data', function (b) { socket.serialtunnel.updateBuffer(Buffer.from(b, 'binary')) });
socket.serialtunnel.xtransport = 'mps';
socket.serialtunnel.xip = socket.remoteAddress;
socket.on("data", function (b) { socket.serialtunnel.updateBuffer(Buffer.from(b, "binary")) });
socket.serialtunnel.forwardwrite = function (b) { socket.write(b, "binary") }
socket.on("close", function () { socket.serialtunnel.emit('end'); });
socket.on("close", function () { socket.serialtunnel.emit("end"); });
// Pass socket wrapper to the MQTT broker
parent.mqttbroker.handle(socket.serialtunnel);

View File

@ -1,6 +1,6 @@
/**
* @description MQTT broker reference implementation based on AEDES
* @author Joko Banu Sastriawan
* @author Joko Banu Sastriawan, Ylian Saint-Hilaire
* @copyright Intel Corporation 2018-2019
* @license Apache-2.0
* @version v0.0.1
@ -8,44 +8,94 @@
module.exports.CreateMQTTBroker = function (parent, db, args) {
// Internal objects container
var obj = {}
obj.parent = parent;
obj.db = db;
obj.args = args;
obj.aedes = require("aedes")();
obj.handle = obj.aedes.handle;
obj.connections = {}; // NodesID --> client array
// argument parsing -- tbd
// event handling and filtering
// authentication filter
// Connection Authentication
obj.aedes.authenticate = function (client, username, password, callback) {
// TODO: add authentication handler
obj.parent.debug("mqtt", "Authentication with " + username + ":" + password);
callback(null, true);
obj.parent.debug("mqtt", "Authentication with " + username + ":" + password + ":" + client.id + ", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
var usersplit = username.split(':');
if (usersplit.length != 5) { callback(null, false); return; }
// Setup the identifiers
var xnodeid = usersplit[1];
var xmeshid = usersplit[2];
var xdomainid = usersplit[3];
// Convert meshid from HEX to Base64 if needed
if (xmeshid.length == 96) { xmeshid = Buffer.from(xmeshid, 'hex').toString('base64'); }
if ((xmeshid.length != 64) || (xnodeid.length != 64)) { callback(null, false); return; }
client.xdbNodeKey = 'node/' + xdomainid + '/' + xnodeid;
client.xdbMeshKey = 'mesh/' + xdomainid + '/' + xmeshid;
// Check if this node exists in the database
db.Get(client.xdbNodeKey, function (err, nodes) {
if ((nodes == null) || (nodes.length != 1)) { callback(null, false); return; } // Node does not exist
// If this device now has a different meshid, fix it here.
client.xdbMeshKey = nodes[0].meshid;
if (obj.connections[client.xdbNodeKey] == null) {
obj.connections[client.xdbNodeKey] = [client];
parent.SetConnectivityState(client.xdbMeshKey, client.xdbNodeKey, Date.now(), 16, 7); // Indicate this node has a MQTT connection, 7 = Present state
} else {
obj.connections[client.xdbNodeKey].push(client);
}
// check if a client can publish a packet
client.conn.parent = client;
client.conn.on('end', function () {
// client is "this.parent"
obj.parent.debug("mqtt", "Connection closed, " + this.parent.conn.xtransport + "://" + cleanRemoteAddr(this.parent.conn.xip));
// Remove this client from the connections list
if ((this.parent.xdbNodeKey != null) && (obj.connections[this.parent.xdbNodeKey] != null)) {
var clients = obj.connections[this.parent.xdbNodeKey], i = clients.indexOf(client);
if (i >= 0) {
if (clients.length == 1) {
delete obj.connections[this.parent.xdbNodeKey];
parent.ClearConnectivityState(this.parent.xdbMeshKey, this.parent.xdbNodeKey, 16); // Remove the MQTT connection for this node
} else { clients.splice(i, 1); }
}
}
this.parent.close();
});
callback(null, true);
});
}
// Check if a client can publish a packet
obj.aedes.authorizePublish = function (client, packet, callback) {
// TODO: add authorized publish control
obj.parent.debug("mqtt", "AuthorizePublish");
obj.parent.debug("mqtt", "AuthorizePublish, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
callback(null);
}
// Check if a client can publish a packet
obj.aedes.authorizeSubscribe = function (client, sub, callback) {
// TODO: add subscription control here
obj.parent.debug("mqtt", "AuthorizeSubscribe");
obj.parent.debug("mqtt", "AuthorizeSubscribe, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
callback(null, sub);
}
// Check if a client can publish a packet
obj.aedes.authorizeForward = function (client, packet) {
// TODO: add forwarding control
obj.parent.debug("mqtt", "AuthorizeForward");
obj.parent.debug("mqtt", "AuthorizeForward, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip));
//return packet;
return packet;
}
obj.handle = obj.aedes.handle;
// Clean a IPv6 address that encodes a IPv4 address
function cleanRemoteAddr(addr) { if (typeof addr != 'string') { return null; } if (addr.indexOf('::ffff:') == 0) { return addr.substring(7); } else { return addr; } }
return obj;
}

View File

@ -1,6 +1,6 @@
{
"name": "meshcentral",
"version": "0.4.1-s",
"version": "0.4.1-t",
"keywords": [
"Remote Management",
"Intel AMT",

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1771,6 +1771,7 @@
if ((node.conn & 2) != 0) { states.push('<span title="Intel&reg; AMT CIRA is connected and ready for use.">CIRA</span>'); }
else if ((node.conn & 4) != 0) { states.push('<span title="Intel&reg; AMT is routable.">Intel&reg; AMT</span>'); }
if ((node.conn & 8) != 0) { states.push('<span title="Mesh agent is reachable using another agent as relay.">Relay</span>'); }
if ((node.conn & 16) != 0) { states.push('<span title="MQTT connection to the device is active.">MQTT</span>'); }
}
if ((node.pwr != null) && (node.pwr != 0)) { states.push(powerStateStrings[node.pwr]); }
return states.join(', ');
@ -1945,6 +1946,7 @@
if ((node.conn & 2) != 0) cstate.push('<span title="Intel&reg; AMT CIRA is connected and ready for use.">Intel&reg; AMT CIRA</span>');
else if ((node.conn & 4) != 0) cstate.push('<span title="Intel&reg; AMT is routable and ready for use.">Intel&reg; AMT</span>');
if ((node.conn & 8) != 0) cstate.push('<span title="Software agent is reachable using another agent as relay.">Agent Relay</span>');
if ((node.conn & 16) != 0) cstate.push('<span title="MQTT connection to the device is active.">MQTT</span>');
x += addDeviceAttribute('Connectivity', cstate.join(', '));
}
@ -1984,6 +1986,7 @@
if ((connectivity & 1) != 0) { if (powerstate.length > 0) { powerstate += ', '; } powerstate += '<span style=font-size:10px title="Agent connected">Mesh Agent</span>'; }
if ((connectivity & 2) != 0) { if (powerstate.length > 0) { powerstate += ', '; } powerstate += '<span style=font-size:10px title="Intel&reg; AMT connected">Intel&reg; AMT connected</span>'; }
else if ((connectivity & 4) != 0) { if (powerstate.length > 0) { powerstate += ', '; } powerstate += '<span style=font-size:10px title="Intel&reg; AMT detected">Intel&reg; AMT detected</span>'; }
if ((connectivity & 16) != 0) { if (powerstate.length > 0) { powerstate += '<br/>'; } powerstate += '<span style=font-size:12px title="MQTT connected">MQTT channel connected</span>'; }
QH('MainComputerState', powerstate);
// Set the node icon

View File

@ -2151,11 +2151,13 @@
if (((node.conn & 1) == 0) && ((message.event.conn & 1) != 0)) { addNotification({ text: 'Agent connected', title: node.name, icon: node.icon, nodeid: node._id }); }
if (((node.conn & 2) == 0) && ((message.event.conn & 2) != 0)) { addNotification({ text: 'Intel AMT detected', title: node.name, icon: node.icon, nodeid: node._id }); }
if (((node.conn & 4) == 0) && ((message.event.conn & 4) != 0)) { addNotification({ text: 'Intel AMT CIRA connected', title: node.name, icon: node.icon, nodeid: node._id }); }
if (((node.conn & 16) == 0) && ((message.event.conn & 16) != 0)) { addNotification({ text: 'MQTT connected', title: node.name, icon: node.icon, nodeid: node._id }); }
}
if (n & 4) {
if (((node.conn & 1) != 0) && ((message.event.conn & 1) == 0)) { addNotification({ text: 'Agent disconnected', title: node.name, icon: node.icon, nodeid: node._id }); }
if (((node.conn & 2) != 0) && ((message.event.conn & 2) == 0)) { addNotification({ text: 'Intel AMT not detected', title: node.name, icon: node.icon, nodeid: node._id }); }
if (((node.conn & 4) != 0) && ((message.event.conn & 4) == 0)) { addNotification({ text: 'Intel AMT CIRA disconnected', title: node.name, icon: node.icon, nodeid: node._id }); }
if (((node.conn & 16) != 0) && ((message.event.conn & 16) == 0)) { addNotification({ text: 'MQTT disconnected', title: node.name, icon: node.icon, nodeid: node._id }); }
}
// Change the node connection state
@ -2569,6 +2571,7 @@
if ((node.conn & 2) != 0) { states.push('<span title="Intel&reg; AMT CIRA is connected and ready for use.">CIRA</span>'); }
else if ((node.conn & 4) != 0) { states.push('<span title="Intel&reg; AMT is routable.">AMT</span>'); }
if ((node.conn & 8) != 0) { states.push('<span title="Mesh agent is reachable using another agent as relay.">Relay</span>'); }
if ((node.conn & 16) != 0) { states.push('<span title="MQTT connection to the device is active.">MQTT</span>'); }
}
r += '<tr><td><div id=devs class=bar18 tabindex=0 onmouseover=devMouseHover(this,1) onmouseout=devMouseHover(this,0) style=height:18px;width:100%;font-size:medium onkeypress="if (event.key==\'Enter\') gotoDevice(\'' + node._id + '\',null,null,event)">';
r += '<div class=deviceBarCheckbox><input class="' + node.meshid + ' DeviceCheckbox" onclick=p1updateInfo() value=devid_' + node._id + ' type=checkbox></div>';
@ -3221,6 +3224,7 @@
if ((node.conn & 2) != 0) { states.push('<span title="Intel&reg; AMT CIRA is connected and ready for use.">CIRA</span>'); }
else if ((node.conn & 4) != 0) { states.push('<span title="Intel&reg; AMT is routable.">Intel&reg; AMT</span>'); }
if ((node.conn & 8) != 0) { states.push('<span title="Mesh agent is reachable using another agent as relay.">Relay</span>'); }
if ((node.conn & 16) != 0) { states.push('<span title="MQTT connection to the device is active.">MQTT</span>'); }
}
if ((node.pwr != null) && (node.pwr != 0)) { states.push(powerStateStrings[node.pwr]); }
return states.join(', ');
@ -4233,6 +4237,7 @@
if ((node.conn & 2) != 0) cstate.push('<span title="Intel&reg; AMT CIRA is connected and ready for use.">Intel&reg; AMT CIRA</span>');
else if ((node.conn & 4) != 0) cstate.push('<span title="Intel&reg; AMT is routable and ready for use.">Intel&reg; AMT</span>');
if ((node.conn & 8) != 0) cstate.push('<span title="Mesh agent is reachable using another agent as relay.">Mesh Relay</span>');
if ((node.conn & 16) != 0) { cstate.push('<span title="MQTT connection to the device is active.">MQTT</span>'); }
x += addDeviceAttribute('Connectivity', cstate.join(', '));
}
@ -4285,6 +4290,7 @@
if ((connectivity & 1) != 0) { if (powerstate.length > 0) { powerstate += '<br/>'; } powerstate += '<span style=font-size:12px title="Agent connected">Agent connected</span>'; }
if ((connectivity & 2) != 0) { if (powerstate.length > 0) { powerstate += '<br/>'; } powerstate += '<span style=font-size:12px title="Intel&reg; AMT connected">Intel&reg; AMT connected</span>'; }
else if ((connectivity & 4) != 0) { if (powerstate.length > 0) { powerstate += '<br/>'; } powerstate += '<span style=font-size:12px title="Intel&reg; AMT detected">Intel&reg; AMT detected</span>'; }
if ((connectivity & 16) != 0) { if (powerstate.length > 0) { powerstate += '<br/>'; } powerstate += '<span style=font-size:12px title="MQTT connected">MQTT channel connected</span>'; }
if ((powerstate == '') && node.lastconnect) { powerstate = '<span style=font-size:12px>Last seen:<br />' + printDateTime(new Date(node.lastconnect)) + '</span>'; }
QH('MainComputerState', powerstate);
@ -7136,6 +7142,7 @@
if (xxdialogMode) return;
var x = '', consent = (currentMesh.consent) ? currentMesh.consent : 0;
x += '<div style="width:100%;border-bottom:1px solid gray;margin-bottom:5px"><b>Desktop</b></div>';
if (debugmode) { x += "<div><label><input type=checkbox id=d20flag2 " + ((consent & 0x0040) ? 'checked' : '') + ">Show connection toolbar</label></div>"; }
x += "<div><label><input type=checkbox id=d20flag1 " + ((consent & 0x0001) ? 'checked' : '') + ">Notify user</label></div>";
x += "<div><label><input type=checkbox id=d20flag2 " + ((consent & 0x0008) ? 'checked' : '') + ">Prompt for user consent</label></div>";
x += '<div style="width:100%;border-bottom:1px solid gray;margin-bottom:5px;margin-top:8px"><b>Terminal</b></div>';

View File

@ -3326,11 +3326,16 @@ module.exports.CreateWebServer = function (parent, db, args, certificates) {
// Setup MQTT broker over websocket
if (obj.parent.mqttbroker != null) {
obj.app.ws(url + 'mqtt.ashx', function (ws, req) {
var ser = SerialTunnel();
ws.on('message', function (b) { ser.updateBuffer(Buffer.from(b, 'binary')) });
ser.forwardwrite = function (b) { ws.send(b, "binary") }
ws.on("close", function () { ser.emit('end'); });
obj.parent.mqttbroker.handle(ser); // Pass socket wrapper to MQTT broker
var domain = checkAgentIpAddress(ws, req);
if (domain == null) { parent.debug('web', 'Got agent connection from blocked IP address ' + cleanRemoteAddr(req.ip) + ', holding.'); return; }
var serialtunnel = SerialTunnel();
serialtunnel.xtransport = 'ws';
serialtunnel.xdomain = domain;
serialtunnel.xip = req.ip;
ws.on('message', function (b) { serialtunnel.updateBuffer(Buffer.from(b, 'binary')) });
serialtunnel.forwardwrite = function (b) { ws.send(b, "binary") }
ws.on("close", function () { serialtunnel.emit('end'); });
obj.parent.mqttbroker.handle(serialtunnel); // Pass socket wrapper to MQTT broker
});
}