diff --git a/mqttbroker.js b/mqttbroker.js index 238a6a74..16650d0d 100644 --- a/mqttbroker.js +++ b/mqttbroker.js @@ -12,9 +12,13 @@ module.exports.CreateMQTTBroker = function (parent, db, args) { obj.parent = parent; obj.db = db; obj.args = args; - obj.aedes = require("aedes")(); - obj.handle = obj.aedes.handle; obj.connections = {}; // NodesID --> client array + const aedes = require("aedes")(); + obj.handle = aedes.handle; + const allowedSubscriptionTopics = [ 'presence' ]; + const denyError = new Error('denied'); + var authError = new Error('Auth error') + authError.returnCode = 1 // Generate a username and password for MQTT login obj.generateLogin = function (meshid, nodeid) { @@ -25,19 +29,28 @@ module.exports.CreateMQTTBroker = function (parent, db, args) { return { meshid: meshid, nodeid: nodeid, user: username, pass: parent.config.settings.mqtt.auth.keyid + ':' + nonce + ':' + parent.crypto.createHash('sha384').update(username + ':' + nonce + ':' + parent.config.settings.mqtt.auth.key).digest("base64") }; } + // Publish a message to a specific nodeid & topic + obj.publish = function (nodeid, topic, message) { + var clients = obj.connections[nodeid]; + if (clients == null) return; + if (typeof message == 'string') { message = new Buffer(message); } + for (var i in clients) { clients[i].publish({ cmd: 'publish', qos: 0, topic: topic, payload: message, retain: false }); } + } + // Connection Authentication - obj.aedes.authenticate = function (client, username, password, callback) { + aedes.authenticate = function (client, username, password, callback) { obj.parent.debug("mqtt", "Authentication User:" + username + ", Pass:" + password.toString() + ", ClientID:" + client.id + ", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); + console.log('MQTT Connect'); // Parse the username and password var usersplit = username.split(':'); var passsplit = password.toString().split(':'); - if ((usersplit.length !== 4) || (passsplit.length !== 3)) { obj.parent.debug("mqtt", "Invalid user/pass format, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; } - if (usersplit[0] !== 'MCAuth1') { obj.parent.debug("mqtt", "Invalid auth method, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; } + if ((usersplit.length !== 4) || (passsplit.length !== 3)) { obj.parent.debug("mqtt", "Invalid user/pass format, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; } + if (usersplit[0] !== 'MCAuth1') { obj.parent.debug("mqtt", "Invalid auth method, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; } // Check authentication - if (passsplit[0] !== parent.config.settings.mqtt.auth.keyid) { obj.parent.debug("mqtt", "Invalid auth keyid, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; } - if (parent.crypto.createHash('sha384').update(username + ':' + passsplit[1] + ':' + parent.config.settings.mqtt.auth.key).digest("base64") !== passsplit[2]) { obj.parent.debug("mqtt", "Invalid password, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(null, false); return; } + if (passsplit[0] !== parent.config.settings.mqtt.auth.keyid) { obj.parent.debug("mqtt", "Invalid auth keyid, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; } + if (parent.crypto.createHash('sha384').update(username + ':' + passsplit[1] + ':' + parent.config.settings.mqtt.auth.key).digest("base64") !== passsplit[2]) { obj.parent.debug("mqtt", "Invalid password, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); callback(authError, null); return; } // Setup the identifiers const xnodeid = usersplit[1]; @@ -49,16 +62,15 @@ module.exports.CreateMQTTBroker = function (parent, db, args) { // Convert meshid from HEX to Base64 if needed if (xmeshid.length === 96) { xmeshid = Buffer.from(xmeshid, 'hex').toString('base64'); } - if ((xmeshid.length !== 64) || (xnodeid.length != 64)) { callback(null, false); return; } + if ((xmeshid.length !== 64) || (xnodeid.length != 64)) { callback(authError, null); return; } + // Set the client nodeid and meshid client.xdbNodeKey = 'node/' + xdomainid + '/' + xnodeid; client.xdbMeshKey = 'mesh/' + xdomainid + '/' + xmeshid; - //console.log(obj.generateLogin(client.xdbMeshKey, client.xdbNodeKey)); - // Check if this node exists in the database db.Get(client.xdbNodeKey, function (err, nodes) { - if ((nodes == null) || (nodes.length != 1)) { callback(null, false); return; } // Node does not exist + if ((nodes == null) || (nodes.length != 1)) { callback(authError, null); return; } // Node does not exist // If this device now has a different meshid, fix it here. client.xdbMeshKey = nodes[0].meshid; @@ -93,27 +105,33 @@ module.exports.CreateMQTTBroker = function (parent, db, args) { } // Check if a client can publish a packet - obj.aedes.authorizePublish = function (client, packet, callback) { - // TODO: add authorized publish control - //console.log(packet); - obj.parent.debug("mqtt", "AuthorizePublish, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); - callback(null); + aedes.authorizeSubscribe = function (client, sub, callback) { + // Subscription control + obj.parent.debug("mqtt", "AuthorizeSubscribe \"" + sub.topic + "\", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); + if (allowedSubscriptionTopics.indexOf(sub.topic) === -1) { sub = null; } // If not a supported subscription, deny it. + callback(null, sub); // We authorize supported topics, but will not allow agents to publish anything to other agents. } // Check if a client can publish a packet - obj.aedes.authorizeSubscribe = function (client, sub, callback) { - // TODO: add subscription control here - obj.parent.debug("mqtt", "AuthorizeSubscribe \"" + sub.topic + "\", " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); - callback(null, sub); + aedes.authorizePublish = function (client, packet, callback) { + // Handle a published message + obj.parent.debug("mqtt", "AuthorizePublish, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); + handleMessage(client.xdbNodeKey, client.xdbNodeKey, packet.topic, packet.payload); + //callback(denyError); // Deny all, clients can't publish anything to other agents. + //callback(null); // Deny all, clients can't publish anything to other agents. } // Check if a client can forward a packet - obj.aedes.authorizeForward = function (client, packet) { + //aedes.authorizeForward = function (client, packet) { // TODO: add forwarding control - //console.log(packet); - obj.parent.debug("mqtt", "AuthorizeForward, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); + //obj.parent.debug("mqtt", "AuthorizeForward, " + client.conn.xtransport + "://" + cleanRemoteAddr(client.conn.xip)); //return packet; - return packet; + //} + + // Handle messages coming from clients + function handleMessage(nodeid, meshid, topic, message) { + console.log('handleMessage', nodeid, topic, message.toString()); + obj.publish(nodeid, 'abc', "This is a server reply"); } // Clean a IPv6 address that encodes a IPv4 address