2019-10-03 03:57:18 +03:00
/ * *
* @ description MQTT broker reference implementation based on AEDES
2019-10-05 03:24:30 +03:00
* @ author Joko Banu Sastriawan , Ylian Saint - Hilaire
2019-10-03 03:57:18 +03:00
* @ copyright Intel Corporation 2018 - 2019
* @ license Apache - 2.0
* @ version v0 . 0.1
* /
module . exports . CreateMQTTBroker = function ( parent , db , args ) {
2019-10-05 03:24:30 +03:00
2019-10-03 03:57:18 +03:00
var obj = { }
obj . parent = parent ;
obj . db = db ;
obj . args = args ;
2019-10-05 03:24:30 +03:00
obj . connections = { } ; // NodesID --> client array
2019-10-08 01:00:36 +03:00
const aedes = require ( "aedes" ) ( ) ;
obj . handle = aedes . handle ;
const allowedSubscriptionTopics = [ 'presence' ] ;
const denyError = new Error ( 'denied' ) ;
var authError = new Error ( 'Auth error' )
authError . returnCode = 1
2019-10-05 03:24:30 +03:00
2019-10-06 00:24:40 +03:00
// Generate a username and password for MQTT login
obj . generateLogin = function ( meshid , nodeid ) {
const meshidsplit = meshid . split ( '/' ) , nodeidsplit = nodeid . split ( '/' ) ;
const xmeshid = meshidsplit [ 2 ] , xnodeid = nodeidsplit [ 2 ] , xdomainid = meshidsplit [ 1 ] ;
const username = 'MCAuth1:' + xnodeid + ':' + xmeshid + ':' + xdomainid ;
const nonce = Buffer . from ( parent . crypto . randomBytes ( 9 ) , 'binary' ) . toString ( 'base64' ) ;
return { meshid : meshid , nodeid : nodeid , user : username , pass : parent . config . settings . mqtt . auth . keyid + ':' + nonce + ':' + parent . crypto . createHash ( 'sha384' ) . update ( username + ':' + nonce + ':' + parent . config . settings . mqtt . auth . key ) . digest ( "base64" ) } ;
}
2019-10-05 03:24:30 +03:00
// Connection Authentication
2019-10-08 01:00:36 +03:00
aedes . authenticate = function ( client , username , password , callback ) {
2019-10-06 00:24:40 +03:00
obj . parent . debug ( "mqtt" , "Authentication User:" + username + ", Pass:" + password . toString ( ) + ", ClientID:" + client . id + ", " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ;
2019-10-05 03:24:30 +03:00
2019-10-06 00:24:40 +03:00
// Parse the username and password
2019-10-05 03:24:30 +03:00
var usersplit = username . split ( ':' ) ;
2019-10-06 00:24:40 +03:00
var passsplit = password . toString ( ) . split ( ':' ) ;
2019-10-08 01:00:36 +03:00
if ( ( usersplit . length !== 4 ) || ( passsplit . length !== 3 ) ) { obj . parent . debug ( "mqtt" , "Invalid user/pass format, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( authError , null ) ; return ; }
if ( usersplit [ 0 ] !== 'MCAuth1' ) { obj . parent . debug ( "mqtt" , "Invalid auth method, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( authError , null ) ; return ; }
2019-10-06 00:24:40 +03:00
// Check authentication
2019-10-08 01:00:36 +03:00
if ( passsplit [ 0 ] !== parent . config . settings . mqtt . auth . keyid ) { obj . parent . debug ( "mqtt" , "Invalid auth keyid, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( authError , null ) ; return ; }
if ( parent . crypto . createHash ( 'sha384' ) . update ( username + ':' + passsplit [ 1 ] + ':' + parent . config . settings . mqtt . auth . key ) . digest ( "base64" ) !== passsplit [ 2 ] ) { obj . parent . debug ( "mqtt" , "Invalid password, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( authError , null ) ; return ; }
2019-10-05 03:24:30 +03:00
// Setup the identifiers
2019-10-06 00:24:40 +03:00
const xnodeid = usersplit [ 1 ] ;
2019-10-05 03:24:30 +03:00
var xmeshid = usersplit [ 2 ] ;
2019-10-06 00:24:40 +03:00
const xdomainid = usersplit [ 3 ] ;
// Check the domain
if ( ( typeof client . conn . xdomain == 'object' ) && ( xdomainid != client . conn . xdomain . id ) ) { obj . parent . debug ( "mqtt" , "Invalid domain connection, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( null , false ) ; return ; }
2019-10-05 03:24:30 +03:00
// Convert meshid from HEX to Base64 if needed
2019-10-06 00:24:40 +03:00
if ( xmeshid . length === 96 ) { xmeshid = Buffer . from ( xmeshid , 'hex' ) . toString ( 'base64' ) ; }
2019-10-08 01:00:36 +03:00
if ( ( xmeshid . length !== 64 ) || ( xnodeid . length != 64 ) ) { callback ( authError , null ) ; return ; }
2019-10-05 03:24:30 +03:00
2019-10-08 01:00:36 +03:00
// Set the client nodeid and meshid
2019-10-05 03:24:30 +03:00
client . xdbNodeKey = 'node/' + xdomainid + '/' + xnodeid ;
client . xdbMeshKey = 'mesh/' + xdomainid + '/' + xmeshid ;
// Check if this node exists in the database
db . Get ( client . xdbNodeKey , function ( err , nodes ) {
2019-10-08 01:00:36 +03:00
if ( ( nodes == null ) || ( nodes . length != 1 ) ) { callback ( authError , null ) ; return ; } // Node does not exist
2019-10-05 03:24:30 +03:00
// If this device now has a different meshid, fix it here.
client . xdbMeshKey = nodes [ 0 ] . meshid ;
if ( obj . connections [ client . xdbNodeKey ] == null ) {
obj . connections [ client . xdbNodeKey ] = [ client ] ;
parent . SetConnectivityState ( client . xdbMeshKey , client . xdbNodeKey , Date . now ( ) , 16 , 7 ) ; // Indicate this node has a MQTT connection, 7 = Present state
} else {
obj . connections [ client . xdbNodeKey ] . push ( client ) ;
}
client . conn . parent = client ;
client . conn . on ( 'end' , function ( ) {
// client is "this.parent"
obj . parent . debug ( "mqtt" , "Connection closed, " + this . parent . conn . xtransport + "://" + cleanRemoteAddr ( this . parent . conn . xip ) ) ;
// Remove this client from the connections list
if ( ( this . parent . xdbNodeKey != null ) && ( obj . connections [ this . parent . xdbNodeKey ] != null ) ) {
var clients = obj . connections [ this . parent . xdbNodeKey ] , i = clients . indexOf ( client ) ;
if ( i >= 0 ) {
if ( clients . length == 1 ) {
delete obj . connections [ this . parent . xdbNodeKey ] ;
parent . ClearConnectivityState ( this . parent . xdbMeshKey , this . parent . xdbNodeKey , 16 ) ; // Remove the MQTT connection for this node
} else { clients . splice ( i , 1 ) ; }
}
}
this . parent . close ( ) ;
} ) ;
callback ( null , true ) ;
} ) ;
2019-10-03 03:57:18 +03:00
}
2019-10-05 03:24:30 +03:00
// Check if a client can publish a packet
2019-10-08 01:00:36 +03:00
aedes . authorizeSubscribe = function ( client , sub , callback ) {
// Subscription control
obj . parent . debug ( "mqtt" , "AuthorizeSubscribe \"" + sub . topic + "\", " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ;
if ( allowedSubscriptionTopics . indexOf ( sub . topic ) === - 1 ) { sub = null ; } // If not a supported subscription, deny it.
callback ( null , sub ) ; // We authorize supported topics, but will not allow agents to publish anything to other agents.
2019-10-03 03:57:18 +03:00
}
2019-10-05 03:24:30 +03:00
2019-10-04 22:18:56 +03:00
// Check if a client can publish a packet
2019-10-08 01:00:36 +03:00
aedes . authorizePublish = function ( client , packet , callback ) {
// Handle a published message
obj . parent . debug ( "mqtt" , "AuthorizePublish, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ;
2019-10-08 21:08:41 +03:00
handleMessage ( client . xdbNodeKey , client . xdbMeshKey , packet . topic , packet . payload ) ;
2019-10-08 04:11:15 +03:00
// We don't accept that any client message be published, so don't call the callback.
2019-10-03 03:57:18 +03:00
}
2019-10-05 03:24:30 +03:00
2019-10-08 04:11:15 +03:00
// Publish a message to a specific nodeid & topic, also send this to peer servers.
obj . publish = function ( nodeid , topic , message ) {
// Publish this message on peer servers.
if ( parent . multiServer != null ) { parent . multiServer . DispatchMessage ( JSON . stringify ( { action : 'mqtt' , nodeid : nodeid , topic : topic , message : message } ) ) ; }
obj . publishNoPeers ( nodeid , topic , message ) ;
}
// Publish a message to a specific nodeid & topic, don't send to peer servers.
obj . publishNoPeers = function ( nodeid , topic , message ) {
// Look for any MQTT connections to send this to
var clients = obj . connections [ nodeid ] ;
if ( clients == null ) return ;
if ( typeof message == 'string' ) { message = new Buffer ( message ) ; }
for ( var i in clients ) { clients [ i ] . publish ( { cmd : 'publish' , qos : 0 , topic : topic , payload : message , retain : false } ) ; }
}
2019-10-08 01:00:36 +03:00
// Handle messages coming from clients
function handleMessage ( nodeid , meshid , topic , message ) {
2019-10-08 21:08:41 +03:00
// Handle messages here
if ( topic == 'console' ) { routeMessage ( { action : 'msg' , type : 'console' , value : message . toString ( ) , source : 'MQTT' } , nodeid , meshid ) ; return ; } // Handle console messages
2019-10-08 04:11:15 +03:00
//console.log('handleMessage', nodeid, topic, message.toString());
//obj.publish(nodeid, 'echoTopic', "Echo: " + message.toString());
2019-10-03 03:57:18 +03:00
}
2019-10-04 22:18:56 +03:00
2019-10-05 03:24:30 +03:00
// Clean a IPv6 address that encodes a IPv4 address
function cleanRemoteAddr ( addr ) { if ( typeof addr != 'string' ) { return null ; } if ( addr . indexOf ( '::ffff:' ) == 0 ) { return addr . substring ( 7 ) ; } else { return addr ; } }
2019-10-08 21:08:41 +03:00
// Route a message
function routeMessage ( command , dbNodeKey , dbMeshKey ) {
// Route a message.
// If this command has a sessionid, that is the target.
if ( command . sessionid != null ) {
if ( typeof command . sessionid != 'string' ) return ;
var splitsessionid = command . sessionid . split ( '/' ) ;
// Check that we are in the same domain and the user has rights over this node.
if ( ( splitsessionid [ 0 ] == 'user' ) && ( splitsessionid [ 1 ] == domain . id ) ) {
// Check if this user has rights to get this message
//if (mesh.links[user._id] == null || ((mesh.links[user._id].rights & 16) == 0)) return; // TODO!!!!!!!!!!!!!!!!!!!!!
// See if the session is connected. If so, go ahead and send this message to the target node
var ws = parent . webserver . wssessions2 [ command . sessionid ] ;
if ( ws != null ) {
command . nodeid = dbNodeKey ; // Set the nodeid, required for responses.
delete command . sessionid ; // Remove the sessionid, since we are sending to that sessionid, so it's implyed.
try { ws . send ( JSON . stringify ( command ) ) ; } catch ( ex ) { }
} else if ( parent . multiServer != null ) {
// See if we can send this to a peer server
var serverid = parent . webserver . wsPeerSessions2 [ command . sessionid ] ;
if ( serverid != null ) {
command . fromNodeid = dbNodeKey ;
parent . multiServer . DispatchMessageSingleServer ( command , serverid ) ;
}
}
}
} else if ( command . userid != null ) { // If this command has a userid, that is the target.
if ( typeof command . userid != 'string' ) return ;
var splituserid = command . userid . split ( '/' ) ;
// Check that we are in the same domain and the user has rights over this node.
if ( ( splituserid [ 0 ] == 'user' ) && ( splituserid [ 1 ] == domain . id ) ) {
// Check if this user has rights to get this message
//if (mesh.links[user._id] == null || ((mesh.links[user._id].rights & 16) == 0)) return; // TODO!!!!!!!!!!!!!!!!!!!!!
// See if the session is connected
var sessions = parent . webserver . wssessions [ command . userid ] ;
// Go ahead and send this message to the target node
if ( sessions != null ) {
command . nodeid = dbNodeKey ; // Set the nodeid, required for responses.
delete command . userid ; // Remove the userid, since we are sending to that userid, so it's implyed.
for ( i in sessions ) { sessions [ i ] . send ( JSON . stringify ( command ) ) ; }
}
if ( parent . multiServer != null ) {
// TODO: Add multi-server support
}
}
} else { // Route this command to the mesh
command . nodeid = dbNodeKey ;
var cmdstr = JSON . stringify ( command ) ;
for ( var userid in parent . webserver . wssessions ) { // Find all connected users for this mesh and send the message
var user = parent . webserver . users [ userid ] ;
if ( ( user != null ) && ( user . links != null ) ) {
var rights = user . links [ dbMeshKey ] ;
if ( rights != null ) { // TODO: Look at what rights are needed for message routing
var xsessions = parent . webserver . wssessions [ userid ] ;
// Send the message to all users on this server
for ( i in xsessions ) { try { xsessions [ i ] . send ( cmdstr ) ; } catch ( e ) { } }
}
}
}
// Send the message to all users of other servers
if ( parent . multiServer != null ) {
delete command . nodeid ;
command . fromNodeid = dbNodeKey ;
command . meshid = dbMeshKey ;
parent . multiServer . DispatchMessage ( command ) ;
}
}
}
2019-10-04 22:18:56 +03:00
return obj ;
2019-10-03 03:57:18 +03:00
}