1
1
mirror of https://github.com/leon-ai/leon.git synced 2024-11-27 08:06:03 +03:00

feat(server): execute modules over HTTP

This commit is contained in:
louistiti 2022-01-09 02:32:02 +08:00
parent cc0fb92351
commit 2e5b2c59da
No known key found for this signature in database
GPG Key ID: 7ECA3DD523793FE6
9 changed files with 313 additions and 166 deletions

View File

@ -59,9 +59,11 @@
]
},
{
"method": "GET",
"method": "POST",
"route": "/p/checker/isitdown/run",
"params": []
"params": [
"url"
]
},
{
"method": "GET",

View File

@ -14,7 +14,14 @@
"Check if nodejs.org is up",
"Check if nodejs.org is working",
"Check if amazon.com is up or down"
]
],
"http_api": {
"entities": [
{
"name": "url"
}
]
}
}
},
"haveibeenpwned": {

View File

@ -29,11 +29,12 @@ export default () => new Promise(async (resolve, reject) => {
let isFileNeedToBeGenerated = true
let pkgObj = { }
// Check if a new routing generation is necessary
if (fs.existsSync(outputFilePath)) {
const mtimeEndpoints = fs.statSync(outputFilePath).mtime.getTime()
for (let i = 0; i < packages.length; i += 1) {
const pkg = packages[i]
const fileInfo = fs.statSync(`${packagesDir}/${pkg}/data/expressions/${lang}.json`)
const mtime = fileInfo.mtime.getTime()
@ -64,7 +65,7 @@ export default () => new Promise(async (resolve, reject) => {
const action = actions[k]
const actionObj = pkgObj[module][action]
const { entities, http_api } = actionObj // eslint-disable-line camelcase
let finalMethod = entities ? 'POST' : 'GET'
let finalMethod = entities || http_api?.entities ? 'POST' : 'GET'
if (http_api?.method) {
finalMethod = http_api.method.toUpperCase()
@ -85,6 +86,8 @@ export default () => new Promise(async (resolve, reject) => {
}
if (entities) {
endpoint.params = entities.map((entity) => entity.name)
} else if (http_api?.entities) {
endpoint.params = http_api.entities.map((entity) => entity.name)
}
finalObj.endpoints.push(endpoint)

View File

@ -5,7 +5,6 @@ import { langs } from '@@/core/langs.json'
import log from '@/helpers/log'
import string from '@/helpers/string'
import Synchronizer from '@/core/synchronizer'
import Tts from '@/tts/tts'
class Brain {
constructor (socket, lang) {
@ -15,6 +14,7 @@ class Brain {
this.process = { }
this.interOutput = { }
this.finalOutput = { }
this._tts = { }
// Read into the language file
const file = `${__dirname}/../data/${this.lang}.json`
@ -24,12 +24,10 @@ class Brain {
log.title('Brain')
log.success('New instance')
}
if (process.env.LEON_TTS === 'true') {
// Init TTS
this.tts = new Tts(this.socket, process.env.LEON_TTS_PROVIDER)
this.tts.init()
}
set tts (newTts) {
this._tts = newTts
}
/**
@ -55,7 +53,7 @@ class Brain {
// Stripe HTML to a whitespace. Whitespace to let the TTS respects punctuation
const speech = rawSpeech.replace(/<(?:.|\n)*?>/gm, ' ')
this.tts.add(speech, end)
this._tts.add(speech, end)
}
this.socket.emit('answer', rawSpeech)
@ -92,17 +90,34 @@ class Brain {
/**
* Execute Python modules
*/
execute (obj) {
execute (obj, opts) {
const executionTimeStart = Date.now()
opts = opts || {
mute: false // Close Leon mouth e.g. over HTTP
}
return new Promise((resolve, reject) => {
const queryId = `${Date.now()}-${string.random(4)}`
const queryObjectPath = `${__dirname}/../tmp/${queryId}.json`
const speeches = []
// Ask to repeat if Leon is not sure about the request
if (obj.classification.confidence < langs[process.env.LEON_LANG].min_confidence) {
this.talk(`${this.wernicke('random_not_sure')}.`, true)
this.socket.emit('is-typing', false)
if (!opts.mute) {
const speech = `${this.wernicke('random_not_sure')}.`
resolve()
speeches.push(speech)
this.talk(speech, true)
this.socket.emit('is-typing', false)
}
const executionTimeEnd = Date.now()
const executionTime = executionTimeEnd - executionTimeStart
resolve({
speeches,
executionTime
})
} else {
// Ensure the process is empty (to be able to execute other processes outside of Brain)
if (Object.keys(this.process).length === 0) {
@ -146,25 +161,51 @@ class Brain {
log.info(data.toString())
this.interOutput = obj.output
this.talk(obj.output.speech.toString())
const speech = obj.output.speech.toString()
if (!opts.mute) {
this.talk(speech)
}
speeches.push(speech)
} else {
output += data
}
} else {
const executionTimeEnd = Date.now()
const executionTime = executionTimeEnd - executionTimeStart
/* istanbul ignore next */
reject({ type: 'warning', obj: new Error(`The ${moduleName} module of the ${packageName} package is not well configured. Check the configuration file.`) })
reject({
type: 'warning',
obj: new Error(`The ${moduleName} module of the ${packageName} package is not well configured. Check the configuration file.`),
speeches,
executionTime
})
}
})
// Handle error
this.process.stderr.on('data', (data) => {
this.talk(`${this.wernicke('random_package_module_errors', '',
{ '%module_name%': moduleName, '%package_name%': packageName })}!`)
const speech = `${this.wernicke('random_package_module_errors', '',
{ '%module_name%': moduleName, '%package_name%': packageName })}!`
if (!opts.mute) {
this.talk(speech)
this.socket.emit('is-typing', false)
}
speeches.push(speech)
Brain.deleteQueryObjFile(queryObjectPath)
this.socket.emit('is-typing', false)
log.title(packageName)
reject({ type: 'error', obj: new Error(data) })
const executionTimeEnd = Date.now()
const executionTime = executionTimeEnd - executionTimeStart
reject({
type: 'error',
obj: new Error(data),
speeches,
executionTime
})
})
// Catch the end of the module execution
@ -177,7 +218,12 @@ class Brain {
// Check if there is an output (no module error)
if (this.finalOutput !== '') {
this.finalOutput = JSON.parse(this.finalOutput).output
this.talk(this.finalOutput.speech.toString(), true)
const speech = this.finalOutput.speech.toString()
if (!opts.mute) {
this.talk(speech, true)
}
speeches.push(speech)
/* istanbul ignore next */
// Synchronize the downloaded content if enabled
@ -191,14 +237,27 @@ class Brain {
// When the synchronization is finished
sync.synchronize((speech) => {
this.talk(speech)
if (!opts.mute) {
this.talk(speech)
}
speeches.push(speech)
})
}
}
Brain.deleteQueryObjFile(queryObjectPath)
this.socket.emit('is-typing', false)
resolve()
if (!opts.mute) {
this.socket.emit('is-typing', false)
}
const executionTimeEnd = Date.now()
const executionTime = executionTimeEnd - executionTimeStart
resolve({
speeches,
executionTime
})
})
// Reset the child process

View File

@ -134,6 +134,7 @@ class Nlu {
}
try {
console.log('obj', obj)
// Inject action entities with the others if there is
await this.brain.execute(obj)
} catch (e) /* istanbul ignore next */ {

View File

@ -10,6 +10,7 @@ import Nlu from '@/core/nlu'
import Brain from '@/core/brain'
import Asr from '@/core/asr'
import Stt from '@/stt/stt'
import Tts from '@/tts/tts'
import corsMidd from '@/plugins/cors'
import otherMidd from '@/plugins/other'
import infoPlugin from '@/api/info/index'
@ -17,167 +18,238 @@ import downloadsPlugin from '@/api/downloads/index'
import log from '@/helpers/log'
import date from '@/helpers/date'
class Server {
constructor () {
this.fastify = Fastify()
this.httpServer = { }
this.brain = { }
this.nlu = { }
this.asr = { }
this.stt = { }
}
const server = { }
const fastify = Fastify()
let brain = { }
let httpServer = { }
/**
* Server entry point
*/
async init () {
this.fastify.addHook('onRequest', corsMidd)
this.fastify.addHook('onRequest', otherMidd)
/**
* Bootstrap socket
*/
const handleOnConnection = (socket) => {
log.title('Client')
log.success('Connected')
log.title('Initialization')
log.success(`The current env is ${process.env.LEON_NODE_ENV}`)
log.success(`The current version is ${version}`)
// Init
socket.on('init', async (data) => {
log.info(`Type: ${data}`)
log.info(`Socket id: ${socket.id}`)
if (!Object.keys(langs).includes(process.env.LEON_LANG) === true) {
process.env.LEON_LANG = 'en-US'
log.warning('The language you chose is not supported, then the default language has been applied')
}
if (data === 'hotword-node') {
// Hotword triggered
socket.on('hotword-detected', (data) => {
log.title('Socket')
log.success(`Hotword ${data.hotword} detected`)
log.success(`The current language is ${process.env.LEON_LANG}`)
log.success(`The current time zone is ${date.timeZone()}`)
const sLogger = (process.env.LEON_LOGGER !== 'true') ? 'disabled' : 'enabled'
log.success(`Collaborative logger ${sLogger}`)
await this.bootstrap()
}
/**
* Bootstrap API
*/
async bootstrap () {
const apiVersion = 'v1'
// Render the web app
this.fastify.register(fastifyStatic, {
root: join(__dirname, '..', '..', '..', 'app', 'dist'),
prefix: '/'
})
this.fastify.get('/', (_request, reply) => {
reply.sendFile('index.html')
})
this.fastify.register(infoPlugin, { apiVersion })
this.fastify.register(downloadsPlugin, { apiVersion })
if (process.env.PACKAGES_OVER_HTTP === 'true') {
endpoints.forEach((endpoint) => {
this.fastify.route({
method: endpoint.method,
url: endpoint.route,
handler: (request, reply) => {
// TODO: get params request.body... endpoint.params...
// TODO: this.brain.execute(obj)
reply.send({ hello: 'world' })
}
})
socket.broadcast.emit('enable-record')
})
}
} else {
let sttState = 'disabled'
let ttsState = 'disabled'
this.httpServer = this.fastify.server
brain = new Brain(socket, langs[process.env.LEON_LANG].short)
const nlu = new Nlu(brain)
const asr = new Asr()
let stt = { }
let tts = { }
try {
await this.listen(process.env.LEON_PORT)
} catch (e) {
log.error(e.message)
}
}
/* istanbul ignore if */
if (process.env.LEON_STT === 'true') {
sttState = 'enabled'
/**
* Launch server
*/
async listen (port) {
const io = process.env.LEON_NODE_ENV === 'development'
? socketio(this.httpServer, { cors: { origin: `${process.env.LEON_HOST}:3000` } })
: socketio(this.httpServer)
stt = new Stt(socket, process.env.LEON_STT_PROVIDER)
stt.init(() => null)
}
io.on('connection', this.connection)
if (process.env.LEON_TTS === 'true') {
ttsState = 'enabled'
await this.fastify.listen(port, '0.0.0.0')
log.success(`Server is available at ${process.env.LEON_HOST}:${port}`)
}
/**
* Bootstrap socket
*/
async connection (socket) {
log.title('Client')
log.success('Connected')
// Init
socket.on('init', async (data) => {
log.info(`Type: ${data}`)
log.info(`Socket id: ${socket.id}`)
if (data === 'hotword-node') {
// Hotword triggered
socket.on('hotword-detected', (data) => {
log.title('Socket')
log.success(`Hotword ${data.hotword} detected`)
socket.broadcast.emit('enable-record')
tts = new Tts(socket, process.env.LEON_TTS_PROVIDER)
tts.init((ttsInstance) => {
brain.tts = ttsInstance
})
} else {
let sttState = 'disabled'
let ttsState = 'disabled'
}
this.brain = new Brain(socket, langs[process.env.LEON_LANG].short)
this.nlu = new Nlu(this.brain)
this.asr = new Asr()
log.title('Initialization')
log.success(`STT ${sttState}`)
log.success(`TTS ${ttsState}`)
/* istanbul ignore if */
if (process.env.LEON_STT === 'true') {
sttState = 'enabled'
// Train modules expressions
try {
await nlu.loadModel(join(__dirname, '../data/leon-model.nlp'))
} catch (e) {
log[e.type](e.obj.message)
}
this.stt = new Stt(socket, process.env.LEON_STT_PROVIDER)
this.stt.init()
}
// Listen for new query
socket.on('query', async (data) => {
log.title('Socket')
log.info(`${data.client} emitted: ${data.value}`)
if (process.env.LEON_TTS === 'true') {
ttsState = 'enabled'
}
socket.emit('is-typing', true)
await nlu.process(data.value)
})
log.title('Initialization')
log.success(`STT ${sttState}`)
log.success(`TTS ${ttsState}`)
// Train modules expressions
// Handle automatic speech recognition
socket.on('recognize', async (data) => {
try {
await this.nlu.loadModel(join(__dirname, '../data/leon-model.nlp'))
await asr.run(data, stt)
} catch (e) {
log[e.type](e.obj.message)
}
})
}
})
}
// Listen for new query
socket.on('query', async (data) => {
log.title('Socket')
log.info(`${data.client} emitted: ${data.value}`)
/**
* Launch server
*/
const listen = async (port) => {
const io = process.env.LEON_NODE_ENV === 'development'
? socketio(httpServer, { cors: { origin: `${process.env.LEON_HOST}:3000` } })
: socketio(httpServer)
socket.emit('is-typing', true)
await this.nlu.process(data.value)
})
io.on('connection', handleOnConnection)
// Handle automatic speech recognition
socket.on('recognize', async (data) => {
try {
await this.asr.run(data, this.stt)
} catch (e) {
log[e.type](e.obj.message)
await fastify.listen(port, '0.0.0.0')
log.success(`Server is available at ${process.env.LEON_HOST}:${port}`)
}
/**
* Bootstrap API
*/
const bootstrap = async () => {
const apiVersion = 'v1'
// Render the web app
fastify.register(fastifyStatic, {
root: join(__dirname, '..', '..', '..', 'app', 'dist'),
prefix: '/'
})
fastify.get('/', (_request, reply) => {
reply.sendFile('index.html')
})
fastify.register(infoPlugin, { apiVersion })
fastify.register(downloadsPlugin, { apiVersion })
if (process.env.PACKAGES_OVER_HTTP === 'true') {
// Dynamically expose Leon modules over HTTP
endpoints.forEach((endpoint) => {
fastify.route({
method: endpoint.method,
url: endpoint.route,
async handler (request, reply) {
const [, , pkg, module, action] = endpoint.route.split('/')
const { params } = endpoint
const entities = []
params.forEach((param) => {
const value = request.body[param]
// TODO: be able to handle "url": [] from entity params
let entity = {
entity: param,
sourceText: value,
utteranceText: value,
resolution: { value }
}
if (Array.isArray(value)) {
value.forEach((v) => {
entity = {
entity: param,
sourceText: v,
utteranceText: v,
resolution: { v }
}
entities.push(entity)
})
} else {
entities.push(entity)
}
})
const obj = {
query: '',
entities,
classification: {
package: pkg,
module,
action,
confidence: 1
}
}
})
}
const responseData = {
package: pkg,
module,
action,
execution_time: 0, // ms
speeches: []
}
try {
// Inject action entities with the others if there is
const { speeches, executionTime } = await brain.execute(obj, { mute: true })
reply.send({
...responseData,
entities,
speeches,
execution_time: executionTime,
success: true
})
} catch (e) /* istanbul ignore next */ {
log[e.type](e.obj.message)
reply.statusCode = 500
reply.send({
...responseData,
speeches: e.speeches,
execution_time: e.executionTime,
error: e.obj.message,
success: false
})
}
}
})
})
}
httpServer = fastify.server
try {
await listen(process.env.LEON_PORT)
} catch (e) {
log.error(e.message)
}
}
export default Server
/**
* Server entry point
*/
server.init = async () => {
fastify.addHook('onRequest', corsMidd)
fastify.addHook('onRequest', otherMidd)
log.title('Initialization')
log.success(`The current env is ${process.env.LEON_NODE_ENV}`)
log.success(`The current version is ${version}`)
if (!Object.keys(langs).includes(process.env.LEON_LANG) === true) {
process.env.LEON_LANG = 'en-US'
log.warning('The language you chose is not supported, then the default language has been applied')
}
log.success(`The current language is ${process.env.LEON_LANG}`)
log.success(`The current time zone is ${date.timeZone()}`)
const sLogger = (process.env.LEON_LOGGER !== 'true') ? 'disabled' : 'enabled'
log.success(`Collaborative logger ${sLogger}`)
await bootstrap()
}
export default server

View File

@ -1,10 +1,9 @@
import dotenv from 'dotenv'
import Server from '@/core/server'
import server from '@/core/server'
(async () => {
dotenv.config()
const server = new Server()
await server.init()
})()

View File

@ -21,7 +21,7 @@ class Stt {
/**
* Initialize the STT provider
*/
init () {
init (cb) {
log.info('Initializing STT...')
if (!this.providers.includes(this.provider)) {
@ -48,6 +48,8 @@ class Stt {
log.title('STT')
log.success('STT initialized')
cb(this)
return true
}

View File

@ -24,7 +24,7 @@ class Tts {
/**
* Initialize the TTS provider
*/
init () {
init (cb) {
log.info('Initializing TTS...')
if (!this.providers.includes(this.provider)) {
@ -50,6 +50,8 @@ class Tts {
log.title('TTS')
log.success('TTS initialized')
cb(this)
return true
}