From 96da14b1220481602167470e76ac66d63b7f64a3 Mon Sep 17 00:00:00 2001 From: jsastriawan Date: Wed, 2 Oct 2019 17:57:18 -0700 Subject: [PATCH] Added MQTT support over WSS and multiplexed with MPS --- meshcentral.js | 4 ++++ mpsserver.js | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++ mqttbroker.js | 54 +++++++++++++++++++++++++++++++++++++++++++ package.json | 3 +++ webserver.js | 10 ++++++++ 5 files changed, 134 insertions(+) create mode 100644 mqttbroker.js diff --git a/meshcentral.js b/meshcentral.js index a9da0dca..6b041094 100644 --- a/meshcentral.js +++ b/meshcentral.js @@ -27,6 +27,7 @@ function CreateMeshCentralServer(config, args) { obj.redirserver = null; obj.mpsserver = null; obj.apfserver = null; + obj.mqttbroker = null; obj.swarmserver = null; obj.mailserver = null; obj.amtEventHandler = null; @@ -822,6 +823,9 @@ function CreateMeshCentralServer(config, args) { // Create APF server to hook into webserver obj.apfserver = require('./apfserver.js').CreateApfServer(obj, obj.db, obj.args); + // Create MQTT Broker to hook into webserver and mpsserver + obj.mqttbroker = require("./mqttbroker.js").CreateMQTTBroker(obj,obj.db,obj.args); + // Start the web server and if needed, the redirection web server. obj.webserver = require('./webserver.js').CreateWebServer(obj, obj.db, obj.args, obj.certificates); if (obj.redirserver != null) { obj.redirserver.hookMainWebServer(obj.certificates); } diff --git a/mpsserver.js b/mpsserver.js index 764fd28e..9c77e3f1 100644 --- a/mpsserver.js +++ b/mpsserver.js @@ -159,6 +159,44 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) { }; } + // required for TLS piping to MQTT broker + function SerialTunnel(options) { + var obj = new require('stream').Duplex(options); + obj.forwardwrite = null; + obj.updateBuffer = function (chunk) { this.push(chunk); }; + obj._write = function (chunk, encoding, callback) { if (obj.forwardwrite != null) { obj.forwardwrite(chunk); } else { console.err("Failed to fwd _write."); } if (callback) callback(); }; // Pass data written to forward + obj._read = function (size) { }; // Push nothing, anything to read should be pushed from updateBuffer() + return obj; + } + + function getMQTTPacketLength(chunk) { + var packet_len = 0; + if (chunk.readUInt8(0)==16) { + if (chunk.readUInt8(1) < 128 ) { + packet_len += chunk.readUInt8(1) + 2; + } else { + // continuation bit, get real value and do next + packet_len += (chunk.readUInt8(1) & 0x7F) + 2; + if (chunk.readUInt8(2) < 128) { + packet_len += 1 + chunk.readUInt8(2) * 128; + } else { + packet_len += 1 + (chunk.readUInt8(2) & 0x7F) * 128; + if (chunk.readUInt8(3) < 128) { + packet_len += 1 + chunk.readUInt8(3) * 128 * 128; + } else { + packet_len += 1 + (chunk.readUInt8(3) & 0x7F) * 128 * 128; + if (chunk.readUInt8(4) < 128) { + packet_len += 1 + chunk.readUInt8(4) * 128 * 128 * 128; + } else { + packet_len += 1 + (chunk.readUInt8(4) & 0x7F) * 128* 128 * 128; + } + } + } + } + } + return packet_len; + } + function onConnection(socket) { connectionCount++; if (obj.args.mpstlsoffload) { @@ -182,6 +220,31 @@ module.exports.CreateMpsServer = function (parent, db, args, certificates) { if (socket.tag.accumulator.length < 3) return; //if (!socket.tag.clientCert.subject) { console.log("MPS Connection, no client cert: " + socket.remoteAddress); socket.write('HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\nMeshCentral2 MPS server.\r\nNo client certificate given.'); socket.end(); return; } if (socket.tag.accumulator.substring(0, 3) == "GET") { if (args.mpsdebug) { console.log("MPS Connection, HTTP GET detected: " + socket.remoteAddress); } socket.write("HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\nMeshCentral2 MPS server.
Intel® AMT computers should connect here."); socket.end(); return; } + + var chunk = Buffer.from(socket.tag.accumulator,"binary"); + var packet_len = 0; + if (chunk.readUInt8(0)==16) { + packet_len = getMQTTPacketLength(chunk); + } + + if (chunk.readUInt8(0)==16 && (socket.tag.accumulator.length < packet_len )) return;// minimum MQTT detection + + // check if it is MQTT, need more initial packet to probe + if (chunk.readUInt8(0) == 16 && ((chunk.slice(4, 8).toString() === "MQTT") || (chunk.slice(5, 9).toString() === "MQTT") + || (chunk.slice(6, 10).toString() === "MQTT") || (chunk.slice(7, 11).toString() === "MQTT"))) { + parent.debug("mps", "MQTT connection detected."); + socket.removeAllListeners("data"); + socket.removeAllListeners("close"); + socket.setNoDelay(true); + socket.serialtunnel = SerialTunnel(); + socket.on('data', function(b) { socket.serialtunnel.updateBuffer(Buffer.from(b,'binary'))}); + socket.serialtunnel.forwardwrite = function(b) { socket.write(b,"binary")} + socket.on("close", function() { socket.serialtunnel.emit('end');}); + //pass socket wrapper to mqtt broker + parent.mqttbroker.handle(socket.serialtunnel); + socket.unshift(socket.tag.accumulator); + return; + } socket.tag.first = false; // Setup this node with certificate authentication diff --git a/mqttbroker.js b/mqttbroker.js new file mode 100644 index 00000000..13f9de93 --- /dev/null +++ b/mqttbroker.js @@ -0,0 +1,54 @@ +/** +* @description MQTT broker reference implementation based on AEDES +* @author Joko Banu Sastriawan +* @copyright Intel Corporation 2018-2019 +* @license Apache-2.0 +* @version v0.0.1 +*/ + + +module.exports.CreateMQTTBroker = function (parent, db, args) { + + // internal objects container + var obj = {} + obj.parent = parent; + obj.db = db; + obj.args = args; + + obj.aedes = require("aedes")(); + + + // argument parsing -- tbd + + // event handling and filtering + // authentication filter + obj.aedes.authenticate = function (client, username, password, callback) { + // accept all user + // TODO: add authentication handler + obj.parent.debug("mqtt","Authentication with "+username+":"+password); + callback(null, true); + } + + // check if a client can publish a packet + obj.aedes.authorizePublish = function (client, packet, callback) { + //TODO: add authorized publish control + obj.parent.debug("mqtt","AuthorizePublish"); + callback(null); + } + + // check if a client can publish a packet + obj.aedes.authorizeSubscribe = function (client, sub, callback) { + //TODO: add subscription control here + obj.parent.debug("mqtt","AuthorizeSubscribe"); + callback(null, sub); + } + + // check if a client can publish a packet + obj.aedes.authorizeForward = function (client, packet) { + //TODO: add forwarding control + obj.parent.debug("mqtt","AuthorizeForward"); + return packet; + } + obj.handle = obj.aedes.handle; + return obj; +} diff --git a/package.json b/package.json index e5a6aa77..90af03fb 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "sample-config.json" ], "dependencies": { + "aedes": "^0.39.0", "archiver": "^3.0.0", "body-parser": "^1.19.0", "cbor": "4.1.5", @@ -39,9 +40,11 @@ "ipcheck": "^0.1.0", "meshcentral": "*", "minimist": "^1.2.0", + "mqtt": "^3.0.0", "multiparty": "^4.2.1", "nedb": "^1.8.0", "node-forge": "^0.8.4", + "otplib": "^11.0.1", "ws": "^6.2.1", "xmldom": "^0.1.27", "yauzl": "^2.10.0" diff --git a/webserver.js b/webserver.js index 24a28c25..07889d4e 100644 --- a/webserver.js +++ b/webserver.js @@ -3323,6 +3323,16 @@ module.exports.CreateWebServer = function (parent, db, args, certificates) { try { obj.meshAgentHandler.CreateMeshAgent(obj, obj.db, ws, req, obj.args, domain); } catch (e) { console.log(e); } }); + // MQTT broker over websocket + obj.app.ws(url+'mqtt.ashx', function (ws, req) { + var ser = SerialTunnel(); + ws.on('message', function(b) { ser.updateBuffer(Buffer.from(b,'binary'))}); + ser.forwardwrite = function(b) { ws.send(b,"binary")} + ws.on("close", function() { ser.emit('end');}); + //pass socket wrapper to mqtt broker + obj.parent.mqttbroker.handle(ser); + }) + // Memory Tracking if (typeof obj.args.memorytracking == 'number') { obj.app.get(url + 'memorytracking.csv', function (req, res) {