2019-10-03 03:57:18 +03:00
/ * *
* @ description MQTT broker reference implementation based on AEDES
2019-10-05 03:24:30 +03:00
* @ author Joko Banu Sastriawan , Ylian Saint - Hilaire
2020-01-03 05:30:12 +03:00
* @ copyright Intel Corporation 2018 - 2020
2019-10-03 03:57:18 +03:00
* @ license Apache - 2.0
* @ version v0 . 0.1
* /
module . exports . CreateMQTTBroker = function ( parent , db , args ) {
2019-10-05 03:24:30 +03:00
2019-10-03 03:57:18 +03:00
var obj = { }
obj . parent = parent ;
obj . db = db ;
obj . args = args ;
2019-10-05 03:24:30 +03:00
obj . connections = { } ; // NodesID --> client array
2019-10-08 01:00:36 +03:00
const aedes = require ( "aedes" ) ( ) ;
obj . handle = aedes . handle ;
const allowedSubscriptionTopics = [ 'presence' ] ;
const denyError = new Error ( 'denied' ) ;
var authError = new Error ( 'Auth error' )
authError . returnCode = 1
2019-10-05 03:24:30 +03:00
2019-10-06 00:24:40 +03:00
// Generate a username and password for MQTT login
obj . generateLogin = function ( meshid , nodeid ) {
const meshidsplit = meshid . split ( '/' ) , nodeidsplit = nodeid . split ( '/' ) ;
const xmeshid = meshidsplit [ 2 ] , xnodeid = nodeidsplit [ 2 ] , xdomainid = meshidsplit [ 1 ] ;
const username = 'MCAuth1:' + xnodeid + ':' + xmeshid + ':' + xdomainid ;
const nonce = Buffer . from ( parent . crypto . randomBytes ( 9 ) , 'binary' ) . toString ( 'base64' ) ;
return { meshid : meshid , nodeid : nodeid , user : username , pass : parent . config . settings . mqtt . auth . keyid + ':' + nonce + ':' + parent . crypto . createHash ( 'sha384' ) . update ( username + ':' + nonce + ':' + parent . config . settings . mqtt . auth . key ) . digest ( "base64" ) } ;
}
2019-10-05 03:24:30 +03:00
// Connection Authentication
2019-10-08 01:00:36 +03:00
aedes . authenticate = function ( client , username , password , callback ) {
2019-10-06 00:24:40 +03:00
obj . parent . debug ( "mqtt" , "Authentication User:" + username + ", Pass:" + password . toString ( ) + ", ClientID:" + client . id + ", " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ;
2019-10-05 03:24:30 +03:00
2019-10-06 00:24:40 +03:00
// Parse the username and password
2019-10-05 03:24:30 +03:00
var usersplit = username . split ( ':' ) ;
2019-10-06 00:24:40 +03:00
var passsplit = password . toString ( ) . split ( ':' ) ;
2019-10-08 01:00:36 +03:00
if ( ( usersplit . length !== 4 ) || ( passsplit . length !== 3 ) ) { obj . parent . debug ( "mqtt" , "Invalid user/pass format, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( authError , null ) ; return ; }
if ( usersplit [ 0 ] !== 'MCAuth1' ) { obj . parent . debug ( "mqtt" , "Invalid auth method, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( authError , null ) ; return ; }
2019-10-06 00:24:40 +03:00
// Check authentication
2019-10-08 01:00:36 +03:00
if ( passsplit [ 0 ] !== parent . config . settings . mqtt . auth . keyid ) { obj . parent . debug ( "mqtt" , "Invalid auth keyid, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( authError , null ) ; return ; }
if ( parent . crypto . createHash ( 'sha384' ) . update ( username + ':' + passsplit [ 1 ] + ':' + parent . config . settings . mqtt . auth . key ) . digest ( "base64" ) !== passsplit [ 2 ] ) { obj . parent . debug ( "mqtt" , "Invalid password, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( authError , null ) ; return ; }
2019-10-05 03:24:30 +03:00
// Setup the identifiers
2019-10-06 00:24:40 +03:00
const xnodeid = usersplit [ 1 ] ;
2019-10-05 03:24:30 +03:00
var xmeshid = usersplit [ 2 ] ;
2019-10-06 00:24:40 +03:00
const xdomainid = usersplit [ 3 ] ;
// Check the domain
if ( ( typeof client . conn . xdomain == 'object' ) && ( xdomainid != client . conn . xdomain . id ) ) { obj . parent . debug ( "mqtt" , "Invalid domain connection, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ; callback ( null , false ) ; return ; }
2019-10-05 03:24:30 +03:00
// Convert meshid from HEX to Base64 if needed
2019-10-06 00:24:40 +03:00
if ( xmeshid . length === 96 ) { xmeshid = Buffer . from ( xmeshid , 'hex' ) . toString ( 'base64' ) ; }
2019-10-08 01:00:36 +03:00
if ( ( xmeshid . length !== 64 ) || ( xnodeid . length != 64 ) ) { callback ( authError , null ) ; return ; }
2019-10-05 03:24:30 +03:00
2019-10-08 01:00:36 +03:00
// Set the client nodeid and meshid
2019-10-05 03:24:30 +03:00
client . xdbNodeKey = 'node/' + xdomainid + '/' + xnodeid ;
client . xdbMeshKey = 'mesh/' + xdomainid + '/' + xmeshid ;
2019-10-10 01:56:27 +03:00
client . xdomainid = xdomainid ;
2019-10-05 03:24:30 +03:00
// Check if this node exists in the database
db . Get ( client . xdbNodeKey , function ( err , nodes ) {
2019-10-08 01:00:36 +03:00
if ( ( nodes == null ) || ( nodes . length != 1 ) ) { callback ( authError , null ) ; return ; } // Node does not exist
2019-10-05 03:24:30 +03:00
// If this device now has a different meshid, fix it here.
client . xdbMeshKey = nodes [ 0 ] . meshid ;
if ( obj . connections [ client . xdbNodeKey ] == null ) {
obj . connections [ client . xdbNodeKey ] = [ client ] ;
parent . SetConnectivityState ( client . xdbMeshKey , client . xdbNodeKey , Date . now ( ) , 16 , 7 ) ; // Indicate this node has a MQTT connection, 7 = Present state
} else {
obj . connections [ client . xdbNodeKey ] . push ( client ) ;
}
client . conn . parent = client ;
client . conn . on ( 'end' , function ( ) {
// client is "this.parent"
obj . parent . debug ( "mqtt" , "Connection closed, " + this . parent . conn . xtransport + "://" + cleanRemoteAddr ( this . parent . conn . xip ) ) ;
// Remove this client from the connections list
if ( ( this . parent . xdbNodeKey != null ) && ( obj . connections [ this . parent . xdbNodeKey ] != null ) ) {
var clients = obj . connections [ this . parent . xdbNodeKey ] , i = clients . indexOf ( client ) ;
if ( i >= 0 ) {
if ( clients . length == 1 ) {
delete obj . connections [ this . parent . xdbNodeKey ] ;
parent . ClearConnectivityState ( this . parent . xdbMeshKey , this . parent . xdbNodeKey , 16 ) ; // Remove the MQTT connection for this node
} else { clients . splice ( i , 1 ) ; }
}
}
this . parent . close ( ) ;
} ) ;
callback ( null , true ) ;
} ) ;
2019-10-03 03:57:18 +03:00
}
2019-10-05 03:24:30 +03:00
// Check if a client can publish a packet
2019-10-08 01:00:36 +03:00
aedes . authorizeSubscribe = function ( client , sub , callback ) {
// Subscription control
obj . parent . debug ( "mqtt" , "AuthorizeSubscribe \"" + sub . topic + "\", " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ;
if ( allowedSubscriptionTopics . indexOf ( sub . topic ) === - 1 ) { sub = null ; } // If not a supported subscription, deny it.
callback ( null , sub ) ; // We authorize supported topics, but will not allow agents to publish anything to other agents.
2019-10-03 03:57:18 +03:00
}
2019-10-05 03:24:30 +03:00
2019-10-04 22:18:56 +03:00
// Check if a client can publish a packet
2019-10-08 01:00:36 +03:00
aedes . authorizePublish = function ( client , packet , callback ) {
// Handle a published message
obj . parent . debug ( "mqtt" , "AuthorizePublish, " + client . conn . xtransport + "://" + cleanRemoteAddr ( client . conn . xip ) ) ;
2019-10-10 01:56:27 +03:00
handleMessage ( client . xdbNodeKey , client . xdbMeshKey , client . xdomainid , packet . topic , packet . payload ) ;
2019-10-08 04:11:15 +03:00
// We don't accept that any client message be published, so don't call the callback.
2019-10-03 03:57:18 +03:00
}
2019-10-05 03:24:30 +03:00
2019-10-08 04:11:15 +03:00
// Publish a message to a specific nodeid & topic, also send this to peer servers.
obj . publish = function ( nodeid , topic , message ) {
// Publish this message on peer servers.
if ( parent . multiServer != null ) { parent . multiServer . DispatchMessage ( JSON . stringify ( { action : 'mqtt' , nodeid : nodeid , topic : topic , message : message } ) ) ; }
obj . publishNoPeers ( nodeid , topic , message ) ;
}
// Publish a message to a specific nodeid & topic, don't send to peer servers.
obj . publishNoPeers = function ( nodeid , topic , message ) {
// Look for any MQTT connections to send this to
var clients = obj . connections [ nodeid ] ;
if ( clients == null ) return ;
if ( typeof message == 'string' ) { message = new Buffer ( message ) ; }
2019-10-11 23:45:23 +03:00
for ( var i in clients ) {
2019-10-17 00:46:41 +03:00
// Only publish to client that subscribe to the topic
if ( clients [ i ] . subscriptions [ topic ] != null ) { clients [ i ] . publish ( { cmd : 'publish' , qos : 0 , topic : topic , payload : message , retain : false } ) ; }
2019-10-11 23:45:23 +03:00
}
2019-10-08 04:11:15 +03:00
}
2019-10-08 01:00:36 +03:00
// Handle messages coming from clients
2019-10-10 01:56:27 +03:00
function handleMessage ( nodeid , meshid , domainid , topic , message ) {
2019-10-08 21:08:41 +03:00
// Handle messages here
2019-10-10 01:56:27 +03:00
if ( topic == 'console' ) { parent . webserver . routeAgentCommand ( { action : 'msg' , type : 'console' , value : message . toString ( ) , source : 'MQTT' } , domainid , nodeid , meshid ) ; return ; } // Handle console messages
2019-10-08 21:08:41 +03:00
2019-10-08 04:11:15 +03:00
//console.log('handleMessage', nodeid, topic, message.toString());
//obj.publish(nodeid, 'echoTopic', "Echo: " + message.toString());
2019-10-03 03:57:18 +03:00
}
2019-10-04 22:18:56 +03:00
2019-10-05 03:24:30 +03:00
// Clean a IPv6 address that encodes a IPv4 address
function cleanRemoteAddr ( addr ) { if ( typeof addr != 'string' ) { return null ; } if ( addr . indexOf ( '::ffff:' ) == 0 ) { return addr . substring ( 7 ) ; } else { return addr ; } }
2019-10-10 01:56:27 +03:00
// Change a node to a new meshid
obj . changeDeviceMesh = function ( nodeid , newMeshId ) {
var nodes = obj . connections [ nodeid ] ;
if ( nodes != null ) { for ( var i in nodes ) { nodes [ i ] . xdbMeshKey = newMeshId ; } }
2019-10-08 21:08:41 +03:00
}
2019-10-04 22:18:56 +03:00
return obj ;
2019-10-03 03:57:18 +03:00
}