From e53efdd337a100faad781f80868447b590d893b6 Mon Sep 17 00:00:00 2001 From: Omar Ajoue Date: Tue, 9 Feb 2021 23:32:40 +0100 Subject: [PATCH] :sparkles: Separate webhooks from core (#1408) * Unify execution ID across executions * Fix indentation and improved comments * WIP: saving data after each node execution * Added on/off to save data after each step, saving initial data and retries working * Fixing lint issues * Fixing more lint issues * :sparkles: Add bull to execute workflows * :shirt: Fix lint issue * :zap: Add graceful shutdown to worker * :zap: Add loading staticData to worker * :shirt: Fix lint issue * :zap: Fix import * Changed tables metadata to add nullable to stoppedAt * Reload database on migration run * Fixed reloading database schema for sqlite by reconnecting and fixing postgres migration * Added checks to Redis and exiting process if connection is unavailable * Fixing error with new installations * Fix issue with data not being sent back to browser on manual executions with defined destination * Merging bull and unify execution id branch fixes * Main process will now get execution success from database instead of redis * Omit execution duration if execution did not stop * Fix issue with execution list displaying inconsistant information information while a workflow is running * Remove unused hooks to clarify for developers that these wont run in queue mode * Added active pooling to help recover from Redis crashes * Lint issues * Changing default polling interval to 60 seconds * Removed unnecessary attributes from bull job * Added webhooks service and setting to disable webhooks from main process * Fixed executions list when running with queues. Now we get the list of actively running workflows from bull. * Add option to disable deregistration of webhooks on shutdown * Rename WEBHOOK_TUNNEL_URL to WEBHOOK_URL keeping backwards compat. * Added auto refresh to executions list * Improvements to workflow stop process when running with queues * Refactor queue system to use a singleton and avoid code duplication * Improve comments and remove unnecessary commits * Remove console.log from vue file * Blocking webhook process to run without queues * Handling execution stop graciously when possible * Removing initialization of all workflows from webhook process * Refactoring code to remove code duplication for job stop * Improved execution list to be more fluid and less intrusive * Fixing workflow name for current executions when auto updating * :zap: Right align autorefresh checkbox Co-authored-by: Jan Oberhauser --- .../subfolderWithSSL/docker-compose.yml | 2 +- packages/cli/commands/start.ts | 8 +- packages/cli/commands/webhook.ts | 223 +++++++++++++ packages/cli/commands/worker.ts | 13 +- packages/cli/config/index.ts | 24 ++ packages/cli/src/ActiveWorkflowRunner.ts | 4 + packages/cli/src/Queue.ts | 67 ++++ packages/cli/src/ResponseHelper.ts | 1 + packages/cli/src/Server.ts | 232 +++++++------ packages/cli/src/WebhookHelpers.ts | 7 +- packages/cli/src/WebhookServer.ts | 306 ++++++++++++++++++ .../cli/src/WorkflowExecuteAdditionalData.ts | 24 +- packages/cli/src/WorkflowRunner.ts | 44 +-- packages/cli/src/index.ts | 2 + packages/editor-ui/src/Interface.ts | 2 +- .../src/components/ExecutionsList.vue | 50 ++- .../src/components/mixins/restApi.ts | 3 +- packages/editor-ui/src/views/NodeView.vue | 34 +- 18 files changed, 869 insertions(+), 177 deletions(-) create mode 100644 packages/cli/commands/webhook.ts create mode 100644 packages/cli/src/Queue.ts create mode 100644 packages/cli/src/WebhookServer.ts diff --git a/docker/compose/subfolderWithSSL/docker-compose.yml b/docker/compose/subfolderWithSSL/docker-compose.yml index aa2fe669d1..918f6395b6 100644 --- a/docker/compose/subfolderWithSSL/docker-compose.yml +++ b/docker/compose/subfolderWithSSL/docker-compose.yml @@ -49,7 +49,7 @@ services: - N8N_PROTOCOL=https - NODE_ENV=production - N8N_PATH - - WEBHOOK_TUNNEL_URL=https://${DOMAIN_NAME}${N8N_PATH} + - WEBHOOK_URL=https://${DOMAIN_NAME}${N8N_PATH} volumes: - /var/run/docker.sock:/var/run/docker.sock - ${DATA_FOLDER}/.n8n:/home/node/.n8n diff --git a/packages/cli/commands/start.ts b/packages/cli/commands/start.ts index 13bb19b817..112e751df9 100644 --- a/packages/cli/commands/start.ts +++ b/packages/cli/commands/start.ts @@ -82,8 +82,10 @@ export class Start extends Command { process.exit(processExistCode); }, 30000); + const skipWebhookDeregistration = config.get('endpoints.skipWebhoooksDeregistrationOnShutdown') as boolean; + const removePromises = []; - if (activeWorkflowRunner !== undefined) { + if (activeWorkflowRunner !== undefined && skipWebhookDeregistration !== true) { removePromises.push(activeWorkflowRunner.removeAll()); } @@ -253,8 +255,8 @@ export class Start extends Command { // @ts-ignore const webhookTunnel = await localtunnel(port, tunnelSettings); - process.env.WEBHOOK_TUNNEL_URL = webhookTunnel.url + '/'; - this.log(`Tunnel URL: ${process.env.WEBHOOK_TUNNEL_URL}\n`); + process.env.WEBHOOK_URL = webhookTunnel.url + '/'; + this.log(`Tunnel URL: ${process.env.WEBHOOK_URL}\n`); this.log('IMPORTANT! Do not share with anybody as it would give people access to your n8n instance!'); } diff --git a/packages/cli/commands/webhook.ts b/packages/cli/commands/webhook.ts new file mode 100644 index 0000000000..b17db917bd --- /dev/null +++ b/packages/cli/commands/webhook.ts @@ -0,0 +1,223 @@ +import { + UserSettings, +} from 'n8n-core'; +import { Command, flags } from '@oclif/command'; +import * as Redis from 'ioredis'; + +import * as config from '../config'; +import { + ActiveExecutions, + ActiveWorkflowRunner, + CredentialsOverwrites, + CredentialTypes, + Db, + ExternalHooks, + GenericHelpers, + LoadNodesAndCredentials, + NodeTypes, + TestWebhooks, + WebhookServer, +} from "../src"; +import { IDataObject } from 'n8n-workflow'; + + +let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined; +let processExistCode = 0; + + +export class Webhook extends Command { + static description = 'Starts n8n webhook process. Intercepts only production URLs.'; + + static examples = [ + `$ n8n webhook`, + ]; + + static flags = { + help: flags.help({ char: 'h' }), + }; + + /** + * Stops the n8n in a graceful way. + * Make for example sure that all the webhooks from third party services + * get removed. + */ + static async stopProcess() { + console.log(`\nStopping n8n...`); + + try { + const externalHooks = ExternalHooks(); + await externalHooks.run('n8n.stop', []); + + setTimeout(() => { + // In case that something goes wrong with shutdown we + // kill after max. 30 seconds no matter what + process.exit(processExistCode); + }, 30000); + + const removePromises = []; + if (activeWorkflowRunner !== undefined) { + removePromises.push(activeWorkflowRunner.removeAll()); + } + + // Remove all test webhooks + const testWebhooks = TestWebhooks.getInstance(); + removePromises.push(testWebhooks.removeAll()); + + await Promise.all(removePromises); + + // Wait for active workflow executions to finish + const activeExecutionsInstance = ActiveExecutions.getInstance(); + let executingWorkflows = activeExecutionsInstance.getActiveExecutions(); + + let count = 0; + while (executingWorkflows.length !== 0) { + if (count++ % 4 === 0) { + console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`); + } + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + executingWorkflows = activeExecutionsInstance.getActiveExecutions(); + } + + } catch (error) { + console.error('There was an error shutting down n8n.', error); + } + + process.exit(processExistCode); + } + + + async run() { + // Make sure that n8n shuts down gracefully if possible + process.on('SIGTERM', Webhook.stopProcess); + process.on('SIGINT', Webhook.stopProcess); + + const { flags } = this.parse(Webhook); + + // Wrap that the process does not close but we can still use async + await (async () => { + if (config.get('executions.mode') !== 'queue') { + /** + * It is technically possible to run without queues but + * there are 2 known bugs when running in this mode: + * - Executions list will be problematic as the main process + * is not aware of current executions in the webhook processes + * and therefore will display all current executions as error + * as it is unable to determine if it is still running or crashed + * - You cannot stop currently executing jobs from webhook processes + * when running without queues as the main process cannot talk to + * the wehbook processes to communicate workflow execution interruption. + */ + + this.error('Webhook processes can only run with execution mode as queue.'); + } + + try { + // Start directly with the init of the database to improve startup time + const startDbInitPromise = Db.init().catch(error => { + console.error(`There was an error initializing DB: ${error.message}`); + + processExistCode = 1; + // @ts-ignore + process.emit('SIGINT'); + }); + + // Make sure the settings exist + const userSettings = await UserSettings.prepareUserSettings(); + + // Load all node and credential types + const loadNodesAndCredentials = LoadNodesAndCredentials(); + await loadNodesAndCredentials.init(); + + // Load the credentials overwrites if any exist + const credentialsOverwrites = CredentialsOverwrites(); + await credentialsOverwrites.init(); + + // Load all external hooks + const externalHooks = ExternalHooks(); + await externalHooks.init(); + + // Add the found types to an instance other parts of the application can use + const nodeTypes = NodeTypes(); + await nodeTypes.init(loadNodesAndCredentials.nodeTypes); + const credentialTypes = CredentialTypes(); + await credentialTypes.init(loadNodesAndCredentials.credentialTypes); + + // Wait till the database is ready + await startDbInitPromise; + + if (config.get('executions.mode') === 'queue') { + const redisHost = config.get('queue.bull.redis.host'); + const redisPassword = config.get('queue.bull.redis.password'); + const redisPort = config.get('queue.bull.redis.port'); + const redisDB = config.get('queue.bull.redis.db'); + const redisConnectionTimeoutLimit = config.get('queue.bull.redis.timeoutThreshold'); + let lastTimer = 0, cumulativeTimeout = 0; + + const settings = { + retryStrategy: (times: number): number | null => { + const now = Date.now(); + if (now - lastTimer > 30000) { + // Means we had no timeout at all or last timeout was temporary and we recovered + lastTimer = now; + cumulativeTimeout = 0; + } else { + cumulativeTimeout += now - lastTimer; + lastTimer = now; + if (cumulativeTimeout > redisConnectionTimeoutLimit) { + console.error('Unable to connect to Redis after ' + redisConnectionTimeoutLimit + ". Exiting process."); + process.exit(1); + } + } + return 500; + }, + } as IDataObject; + + if (redisHost) { + settings.host = redisHost; + } + if (redisPassword) { + settings.password = redisPassword; + } + if (redisPort) { + settings.port = redisPort; + } + if (redisDB) { + settings.db = redisDB; + } + + // This connection is going to be our heartbeat + // IORedis automatically pings redis and tries to reconnect + // We will be using the retryStrategy above + // to control how and when to exit. + const redis = new Redis(settings); + + redis.on('error', (error) => { + if (error.toString().includes('ECONNREFUSED') === true) { + console.warn('Redis unavailable - trying to reconnect...'); + } else { + console.warn('Error with Redis: ', error); + } + }); + } + + await WebhookServer.start(); + + // Start to get active workflows and run their triggers + activeWorkflowRunner = ActiveWorkflowRunner.getInstance(); + await activeWorkflowRunner.initWebhooks(); + + const editorUrl = GenericHelpers.getBaseUrl(); + this.log('Webhook listener waiting for requests.'); + + } catch (error) { + this.error(`There was an error: ${error.message}`); + + processExistCode = 1; + // @ts-ignore + process.emit('SIGINT'); + } + })(); + } +} diff --git a/packages/cli/commands/worker.ts b/packages/cli/commands/worker.ts index ca8d2937c8..aeb3b2ba08 100644 --- a/packages/cli/commands/worker.ts +++ b/packages/cli/commands/worker.ts @@ -39,6 +39,7 @@ import { import * as config from '../config'; import * as Bull from 'bull'; +import * as Queue from '../src/Queue'; export class Worker extends Command { static description = '\nStarts a n8n worker'; @@ -112,7 +113,6 @@ export class Worker extends Command { const jobData = job.data as IBullJobData; const executionDb = await Db.collections.Execution!.findOne(jobData.executionId) as IExecutionFlattedDb; const currentExecutionDb = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse; - console.log(`Start job: ${job.id} (Workflow ID: ${currentExecutionDb.workflowData.id} | Execution: ${jobData.executionId})`); let staticData = currentExecutionDb.workflowData!.staticData; @@ -203,16 +203,9 @@ export class Worker extends Command { // Wait till the database is ready await startDbInitPromise; - // Connect to bull-queue - const prefix = config.get('queue.bull.prefix') as string; - const redisOptions = config.get('queue.bull.redis') as IDataObject; const redisConnectionTimeoutLimit = config.get('queue.bull.redis.timeoutThreshold'); - // Disabling ready check is necessary as it allows worker to - // quickly reconnect to Redis if Redis crashes or is unreachable - // for some time. With it enabled, worker might take minutes to realize - // redis is back up and resume working. - redisOptions.enableReadyCheck = false; - Worker.jobQueue = new Bull('jobs', { prefix, redis: redisOptions }); + + Worker.jobQueue = Queue.getInstance().getBullObjectInstance(); Worker.jobQueue.process(flags.concurrency, (job) => this.runJob(job, nodeTypes)); const versions = await GenericHelpers.getVersions(); diff --git a/packages/cli/config/index.ts b/packages/cli/config/index.ts index d9414f3dca..8d2c412f08 100644 --- a/packages/cli/config/index.ts +++ b/packages/cli/config/index.ts @@ -464,6 +464,30 @@ const config = convict({ env: 'N8N_ENDPOINT_WEBHOOK_TEST', doc: 'Path for test-webhook endpoint', }, + disableProductionWebhooksOnMainProcess: { + format: Boolean, + default: false, + env: 'N8N_DISABLE_PRODUCTION_MAIN_PROCESS', + doc: 'Disable production webhooks from main process. This helps ensures no http traffic load to main process when using webhook-specific processes.', + }, + skipWebhoooksDeregistrationOnShutdown: { + /** + * Longer explanation: n8n deregisters webhooks on shutdown / deactivation + * and registers on startup / activation. If we skip + * deactivation on shutdown, webhooks will remain active on 3rd party services. + * We don't have to worry about startup as it always + * checks if webhooks already exist. + * If users want to upgrade n8n, it is possible to run + * two instances simultaneously without downtime, similar + * to blue/green deployment. + * WARNING: Trigger nodes (like Cron) will cause duplication + * of work, so be aware when using. + */ + doc: 'Deregister webhooks on external services only when workflows are deactivated. Useful for blue/green deployments.', + format: Boolean, + default: false, + env: 'N8N_SKIP_WEBHOOK_DEREGISTRATION_STARTUP_SHUTDOWN', + }, }, externalHookFiles: { diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 3583007680..3b5ce249ef 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -76,6 +76,10 @@ export class ActiveWorkflowRunner { } } + async initWebhooks() { + this.activeWorkflows = new ActiveWorkflows(); + } + /** * Removes all the currently active workflows * diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts new file mode 100644 index 0000000000..1b85fe4bc4 --- /dev/null +++ b/packages/cli/src/Queue.ts @@ -0,0 +1,67 @@ +import * as Bull from 'bull'; +import * as config from '../config'; +import { IBullJobData } from './Interfaces'; + +export class Queue { + private jobQueue: Bull.Queue; + + constructor() { + const prefix = config.get('queue.bull.prefix') as string; + const redisOptions = config.get('queue.bull.redis') as object; + // Disabling ready check is necessary as it allows worker to + // quickly reconnect to Redis if Redis crashes or is unreachable + // for some time. With it enabled, worker might take minutes to realize + // redis is back up and resume working. + // More here: https://github.com/OptimalBits/bull/issues/890 + // @ts-ignore + this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false }); + } + + async add(jobData: IBullJobData, jobOptions: object): Promise { + return await this.jobQueue.add(jobData,jobOptions); + } + + async getJob(jobId: Bull.JobId): Promise { + return await this.jobQueue.getJob(jobId); + } + + async getJobs(jobTypes: Bull.JobStatus[]): Promise { + return await this.jobQueue.getJobs(jobTypes); + } + + getBullObjectInstance(): Bull.Queue { + return this.jobQueue; + } + + /** + * + * @param job A Bull.Job instance + * @returns boolean true if we were able to securely stop the job + */ + async stopJob(job: Bull.Job): Promise { + if (await job.isActive()) { + // Job is already running so tell it to stop + await job.progress(-1); + return true; + } else { + // Job did not get started yet so remove from queue + try { + await job.remove(); + return true; + } catch (e) { + await job.progress(-1); + } + } + return false; + } +} + +let activeQueueInstance: Queue | undefined; + +export function getInstance(): Queue { + if (activeQueueInstance === undefined) { + activeQueueInstance = new Queue(); + } + + return activeQueueInstance; +} diff --git a/packages/cli/src/ResponseHelper.ts b/packages/cli/src/ResponseHelper.ts index 4458316dcf..014589e0b8 100644 --- a/packages/cli/src/ResponseHelper.ts +++ b/packages/cli/src/ResponseHelper.ts @@ -188,6 +188,7 @@ export function unflattenExecutionData(fullExecutionData: IExecutionFlattedDb): startedAt: fullExecutionData.startedAt, stoppedAt: fullExecutionData.stoppedAt, finished: fullExecutionData.finished ? fullExecutionData.finished : false, + workflowId: fullExecutionData.workflowId, }); return returnData; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index f61ada179d..8149d776bb 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -62,6 +62,7 @@ import { ResponseHelper, TestWebhooks, WebhookHelpers, + WebhookServer, WorkflowCredentials, WorkflowExecuteAdditionalData, WorkflowRunner, @@ -105,6 +106,7 @@ import * as jwks from 'jwks-rsa'; import * as timezones from 'google-timezones-json'; import * as parseUrl from 'parseurl'; import * as querystring from 'querystring'; +import * as Queue from '../src/Queue'; import { OptionsWithUrl } from 'request-promise-native'; class App { @@ -1428,7 +1430,14 @@ class App { limit = parseInt(req.query.limit as string, 10); } - const executingWorkflowIds = this.activeExecutionsInstance.getActiveExecutions().map(execution => execution.id.toString()) as string[]; + let executingWorkflowIds; + + if (config.get('executions.mode') === 'queue') { + const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']); + executingWorkflowIds = currentJobs.map(job => job.data.executionId) as string[]; + } else { + executingWorkflowIds = this.activeExecutionsInstance.getActiveExecutions().map(execution => execution.id.toString()) as string[]; + } const countFilter = JSON.parse(JSON.stringify(filter)); countFilter.select = ['id']; @@ -1453,10 +1462,10 @@ class App { resultsQuery.andWhere(`execution.${filterField} = :${filterField}`, {[filterField]: filter[filterField]}); }); if (req.query.lastId) { - resultsQuery.andWhere(`execution.id <= :lastId`, {lastId: req.query.lastId}); + resultsQuery.andWhere(`execution.id < :lastId`, {lastId: req.query.lastId}); } if (req.query.firstId) { - resultsQuery.andWhere(`execution.id >= :firstId`, {firstId: req.query.firstId}); + resultsQuery.andWhere(`execution.id > :firstId`, {firstId: req.query.firstId}); } if (executingWorkflowIds.length > 0) { resultsQuery.andWhere(`execution.id NOT IN (:...ids)`, {ids: executingWorkflowIds}); @@ -1626,52 +1635,114 @@ class App { // Returns all the currently working executions this.app.get(`/${this.restEndpoint}/executions-current`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { - const executingWorkflows = this.activeExecutionsInstance.getActiveExecutions(); + if (config.get('executions.mode') === 'queue') { + const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']); - const returnData: IExecutionsSummary[] = []; + const currentlyRunningExecutionIds = currentJobs.map(job => job.data.executionId); - let filter: any = {}; // tslint:disable-line:no-any - if (req.query.filter) { - filter = JSON.parse(req.query.filter as string); - } - - for (const data of executingWorkflows) { - if (filter.workflowId !== undefined && filter.workflowId !== data.workflowId) { - continue; - } - returnData.push( - { - idActive: data.id.toString(), - workflowId: data.workflowId.toString(), - mode: data.mode, - retryOf: data.retryOf, - startedAt: new Date(data.startedAt), + const resultsQuery = await Db.collections.Execution! + .createQueryBuilder("execution") + .select([ + 'execution.id', + 'execution.workflowId', + 'execution.mode', + 'execution.retryOf', + 'execution.startedAt', + ]) + .orderBy('execution.id', 'DESC') + .andWhere(`execution.id IN (:...ids)`, {ids: currentlyRunningExecutionIds}); + + if (req.query.filter) { + const filter = JSON.parse(req.query.filter as string); + if (filter.workflowId !== undefined) { + resultsQuery.andWhere('execution.workflowId = :workflowId', {workflowId: filter.workflowId}); } - ); - } + } - return returnData; + const results = await resultsQuery.getMany(); + + return results.map(result => { + return { + idActive: result.id, + workflowId: result.workflowId, + mode: result.mode, + retryOf: result.retryOf !== null ? result.retryOf : undefined, + startedAt: new Date(result.startedAt), + } as IExecutionsSummary; + }); + } else { + const executingWorkflows = this.activeExecutionsInstance.getActiveExecutions(); + + const returnData: IExecutionsSummary[] = []; + + let filter: any = {}; // tslint:disable-line:no-any + if (req.query.filter) { + filter = JSON.parse(req.query.filter as string); + } + + for (const data of executingWorkflows) { + if (filter.workflowId !== undefined && filter.workflowId !== data.workflowId) { + continue; + } + returnData.push( + { + idActive: data.id.toString(), + workflowId: data.workflowId.toString(), + mode: data.mode, + retryOf: data.retryOf, + startedAt: new Date(data.startedAt), + } + ); + } + + return returnData; + } })); // Forces the execution to stop this.app.post(`/${this.restEndpoint}/executions-current/:id/stop`, ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { - const executionId = req.params.id; + if (config.get('executions.mode') === 'queue') { + const currentJobs = await Queue.getInstance().getJobs(['active', 'waiting']); - // Stopt he execution and wait till it is done and we got the data - const result = await this.activeExecutionsInstance.stopExecution(executionId); + const job = currentJobs.find(job => job.data.executionId.toString() === req.params.id); - if (result === undefined) { - throw new Error(`The execution id "${executionId}" could not be found.`); + if (!job) { + throw new Error(`Could not stop "${req.params.id}" as it is no longer in queue.`); + } else { + await Queue.getInstance().stopJob(job); + } + + const executionDb = await Db.collections.Execution?.findOne(req.params.id) as IExecutionFlattedDb; + const fullExecutionData = ResponseHelper.unflattenExecutionData(executionDb) as IExecutionResponse; + + const returnData: IExecutionsStopData = { + mode: fullExecutionData.mode, + startedAt: new Date(fullExecutionData.startedAt), + stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined, + finished: fullExecutionData.finished, + }; + + return returnData; + + } else { + const executionId = req.params.id; + + // Stopt he execution and wait till it is done and we got the data + const result = await this.activeExecutionsInstance.stopExecution(executionId); + + if (result === undefined) { + throw new Error(`The execution id "${executionId}" could not be found.`); + } + + const returnData: IExecutionsStopData = { + mode: result.mode, + startedAt: new Date(result.startedAt), + stoppedAt: result.stoppedAt ? new Date(result.stoppedAt) : undefined, + finished: result.finished, + }; + + return returnData; } - - const returnData: IExecutionsStopData = { - mode: result.mode, - startedAt: new Date(result.startedAt), - stoppedAt: result.stoppedAt ? new Date(result.stoppedAt) : undefined, - finished: result.finished, - }; - - return returnData; })); @@ -1711,88 +1782,9 @@ class App { // Webhooks // ---------------------------------------- - // HEAD webhook requests - this.app.head(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { - // Cut away the "/webhook/" to get the registred part of the url - const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2); - - let response; - try { - response = await this.activeWorkflowRunner.executeWebhook('HEAD', requestUrl, req, res); - } catch (error) { - ResponseHelper.sendErrorResponse(res, error); - return; - } - - if (response.noWebhookResponse === true) { - // Nothing else to do as the response got already sent - return; - } - - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); - }); - - // OPTIONS webhook requests - this.app.options(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { - // Cut away the "/webhook/" to get the registred part of the url - const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2); - - let allowedMethods: string[]; - try { - allowedMethods = await this.activeWorkflowRunner.getWebhookMethods(requestUrl); - allowedMethods.push('OPTIONS'); - - // Add custom "Allow" header to satisfy OPTIONS response. - res.append('Allow', allowedMethods); - } catch (error) { - ResponseHelper.sendErrorResponse(res, error); - return; - } - - ResponseHelper.sendSuccessResponse(res, {}, true, 204); - }); - - // GET webhook requests - this.app.get(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { - // Cut away the "/webhook/" to get the registred part of the url - const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2); - - let response; - try { - response = await this.activeWorkflowRunner.executeWebhook('GET', requestUrl, req, res); - } catch (error) { - ResponseHelper.sendErrorResponse(res, error); - return; - } - - if (response.noWebhookResponse === true) { - // Nothing else to do as the response got already sent - return; - } - - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); - }); - - // POST webhook requests - this.app.post(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { - // Cut away the "/webhook/" to get the registred part of the url - const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2); - - let response; - try { - response = await this.activeWorkflowRunner.executeWebhook('POST', requestUrl, req, res); - } catch (error) { - ResponseHelper.sendErrorResponse(res, error); - return; - } - - if (response.noWebhookResponse === true) { - // Nothing else to do as the response got already sent - return; - } - - ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); - }); + if (config.get('endpoints.disableProductionWebhooksOnMainProcess') !== true) { + WebhookServer.registerProductionWebhooks.apply(this); + } // HEAD webhook requests (test for UI) this.app.head(`/${this.endpointWebhookTest}/*`, async (req: express.Request, res: express.Response) => { diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index c03d44df91..8c5ad273b6 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -453,8 +453,11 @@ export function getWorkflowWebhooksBasic(workflow: Workflow): IWebhookData[] { export function getWebhookBaseUrl() { let urlBaseWebhook = GenericHelpers.getBaseUrl(); - if (process.env.WEBHOOK_TUNNEL_URL !== undefined) { - urlBaseWebhook = process.env.WEBHOOK_TUNNEL_URL; + // We renamed WEBHOOK_TUNNEL_URL to WEBHOOK_URL. This is here to maintain + // backward compatibility. Will be deprecated and removed in the future. + if (process.env.WEBHOOK_TUNNEL_URL !== undefined || process.env.WEBHOOK_URL !== undefined) { + // @ts-ignore + urlBaseWebhook = process.env.WEBHOOK_TUNNEL_URL || process.env.WEBHOOK_URL; } return urlBaseWebhook; diff --git a/packages/cli/src/WebhookServer.ts b/packages/cli/src/WebhookServer.ts new file mode 100644 index 0000000000..becfb71af7 --- /dev/null +++ b/packages/cli/src/WebhookServer.ts @@ -0,0 +1,306 @@ +import * as express from 'express'; +import { + readFileSync, +} from 'fs'; +import { + getConnectionManager, +} from 'typeorm'; +import * as bodyParser from 'body-parser'; +require('body-parser-xml')(bodyParser); +import * as _ from 'lodash'; + +import { + ActiveExecutions, + ActiveWorkflowRunner, + Db, + ExternalHooks, + GenericHelpers, + ICustomRequest, + IExternalHooksClass, + IPackageVersions, + ResponseHelper, +} from './'; + +import * as compression from 'compression'; +import * as config from '../config'; +import * as parseUrl from 'parseurl'; + +export function registerProductionWebhooks() { + // HEAD webhook requests + this.app.head(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { + // Cut away the "/webhook/" to get the registred part of the url + const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2); + + let response; + try { + response = await this.activeWorkflowRunner.executeWebhook('HEAD', requestUrl, req, res); + } catch (error) { + ResponseHelper.sendErrorResponse(res, error); + return; + } + + if (response.noWebhookResponse === true) { + // Nothing else to do as the response got already sent + return; + } + + ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + }); + + // OPTIONS webhook requests + this.app.options(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { + // Cut away the "/webhook/" to get the registred part of the url + const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2); + + let allowedMethods: string[]; + try { + allowedMethods = await this.activeWorkflowRunner.getWebhookMethods(requestUrl); + allowedMethods.push('OPTIONS'); + + // Add custom "Allow" header to satisfy OPTIONS response. + res.append('Allow', allowedMethods); + } catch (error) { + ResponseHelper.sendErrorResponse(res, error); + return; + } + + ResponseHelper.sendSuccessResponse(res, {}, true, 204); + }); + + // GET webhook requests + this.app.get(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { + // Cut away the "/webhook/" to get the registred part of the url + const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2); + + let response; + try { + response = await this.activeWorkflowRunner.executeWebhook('GET', requestUrl, req, res); + } catch (error) { + ResponseHelper.sendErrorResponse(res, error); + return; + } + + if (response.noWebhookResponse === true) { + // Nothing else to do as the response got already sent + return; + } + + ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + }); + + // POST webhook requests + this.app.post(`/${this.endpointWebhook}/*`, async (req: express.Request, res: express.Response) => { + // Cut away the "/webhook/" to get the registred part of the url + const requestUrl = (req as ICustomRequest).parsedUrl!.pathname!.slice(this.endpointWebhook.length + 2); + + let response; + try { + response = await this.activeWorkflowRunner.executeWebhook('POST', requestUrl, req, res); + } catch (error) { + ResponseHelper.sendErrorResponse(res, error); + return; + } + + if (response.noWebhookResponse === true) { + // Nothing else to do as the response got already sent + return; + } + + ResponseHelper.sendSuccessResponse(res, response.data, true, response.responseCode); + }); +} + +class App { + + app: express.Application; + activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner; + endpointWebhook: string; + endpointPresetCredentials: string; + externalHooks: IExternalHooksClass; + saveDataErrorExecution: string; + saveDataSuccessExecution: string; + saveManualExecutions: boolean; + executionTimeout: number; + maxExecutionTimeout: number; + timezone: string; + activeExecutionsInstance: ActiveExecutions.ActiveExecutions; + versions: IPackageVersions | undefined; + restEndpoint: string; + protocol: string; + sslKey: string; + sslCert: string; + + presetCredentialsLoaded: boolean; + + constructor() { + this.app = express(); + + this.endpointWebhook = config.get('endpoints.webhook') as string; + this.saveDataErrorExecution = config.get('executions.saveDataOnError') as string; + this.saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string; + this.saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean; + this.executionTimeout = config.get('executions.timeout') as number; + this.maxExecutionTimeout = config.get('executions.maxTimeout') as number; + this.timezone = config.get('generic.timezone') as string; + this.restEndpoint = config.get('endpoints.rest') as string; + + this.activeWorkflowRunner = ActiveWorkflowRunner.getInstance(); + + this.activeExecutionsInstance = ActiveExecutions.getInstance(); + + this.protocol = config.get('protocol'); + this.sslKey = config.get('ssl_key'); + this.sslCert = config.get('ssl_cert'); + + this.externalHooks = ExternalHooks(); + + this.presetCredentialsLoaded = false; + this.endpointPresetCredentials = config.get('credentials.overwrite.endpoint') as string; + } + + + /** + * Returns the current epoch time + * + * @returns {number} + * @memberof App + */ + getCurrentDate(): Date { + return new Date(); + } + + + async config(): Promise { + + this.versions = await GenericHelpers.getVersions(); + + // Compress the response data + this.app.use(compression()); + + // Make sure that each request has the "parsedUrl" parameter + this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => { + (req as ICustomRequest).parsedUrl = parseUrl(req); + // @ts-ignore + req.rawBody = Buffer.from('', 'base64'); + next(); + }); + + // Support application/json type post data + this.app.use(bodyParser.json({ + limit: '16mb', verify: (req, res, buf) => { + // @ts-ignore + req.rawBody = buf; + }, + })); + + // Support application/xml type post data + // @ts-ignore + this.app.use(bodyParser.xml({ + limit: '16mb', xmlParseOptions: { + normalize: true, // Trim whitespace inside text nodes + normalizeTags: true, // Transform tags to lowercase + explicitArray: false, // Only put properties in array if length > 1 + }, + })); + + this.app.use(bodyParser.text({ + limit: '16mb', verify: (req, res, buf) => { + // @ts-ignore + req.rawBody = buf; + }, + })); + + //support application/x-www-form-urlencoded post data + this.app.use(bodyParser.urlencoded({ extended: false, + verify: (req, res, buf) => { + // @ts-ignore + req.rawBody = buf; + }, + })); + + if (process.env['NODE_ENV'] !== 'production') { + this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => { + // Allow access also from frontend when developing + res.header('Access-Control-Allow-Origin', 'http://localhost:8080'); + res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS, PUT, PATCH, DELETE'); + res.header('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept, sessionid'); + next(); + }); + } + + + this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => { + if (Db.collections.Workflow === null) { + const error = new ResponseHelper.ResponseError('Database is not ready!', undefined, 503); + return ResponseHelper.sendErrorResponse(res, error); + } + + next(); + }); + + + + // ---------------------------------------- + // Healthcheck + // ---------------------------------------- + + + // Does very basic health check + this.app.get('/healthz', async (req: express.Request, res: express.Response) => { + + const connectionManager = getConnectionManager(); + + if (connectionManager.connections.length === 0) { + const error = new ResponseHelper.ResponseError('No Database connection found!', undefined, 503); + return ResponseHelper.sendErrorResponse(res, error); + } + + if (connectionManager.connections[0].isConnected === false) { + // Connection is not active + const error = new ResponseHelper.ResponseError('Database connection not active!', undefined, 503); + return ResponseHelper.sendErrorResponse(res, error); + } + + // Everything fine + const responseData = { + status: 'ok', + }; + + ResponseHelper.sendSuccessResponse(res, responseData, true, 200); + }); + + registerProductionWebhooks.apply(this); + + } + +} + +export async function start(): Promise { + const PORT = config.get('port'); + const ADDRESS = config.get('listen_address'); + + const app = new App(); + + await app.config(); + + let server; + + if (app.protocol === 'https' && app.sslKey && app.sslCert) { + const https = require('https'); + const privateKey = readFileSync(app.sslKey, 'utf8'); + const cert = readFileSync(app.sslCert, 'utf8'); + const credentials = { key: privateKey, cert }; + server = https.createServer(credentials, app.app); + } else { + const http = require('http'); + server = http.createServer(app.app); + } + + server.listen(PORT, ADDRESS, async () => { + const versions = await GenericHelpers.getVersions(); + console.log(`n8n ready on ${ADDRESS}, port ${PORT}`); + console.log(`Version: ${versions.cli}`); + + await app.externalHooks.run('n8n.ready', [app]); + }); +} diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index d7e5a19312..2ef00c0473 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -193,20 +193,24 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { workflowExecuteBefore: [ async function (this: WorkflowHooks): Promise { // Push data to editor-ui once workflow finished - const pushInstance = Push.getInstance(); - pushInstance.send('executionStarted', { - executionId: this.executionId, - mode: this.mode, - startedAt: new Date(), - retryOf: this.retryOf, - workflowId: this.workflowData.id as string, - workflowName: this.workflowData.name, - }); + if (this.mode === 'manual') { + const pushInstance = Push.getInstance(); + pushInstance.send('executionStarted', { + executionId: this.executionId, + mode: this.mode, + startedAt: new Date(), + retryOf: this.retryOf, + workflowId: this.workflowData.id as string, + workflowName: this.workflowData.name, + }); + } }, ], workflowExecuteAfter: [ async function (this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject): Promise { - pushExecutionFinished(this.mode, fullRunData, this.executionId, undefined, this.retryOf); + if (this.mode === 'manual') { + pushExecutionFinished(this.mode, fullRunData, this.executionId, undefined, this.retryOf); + } }, ], }; diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 53e61e219f..5338a4a46d 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -41,6 +41,7 @@ import { join as pathJoin } from 'path'; import { fork } from 'child_process'; import * as Bull from 'bull'; +import * as Queue from './Queue'; export class WorkflowRunner { activeExecutions: ActiveExecutions.ActiveExecutions; @@ -57,11 +58,7 @@ export class WorkflowRunner { const executionsMode = config.get('executions.mode') as string; if (executionsMode === 'queue') { - // Connect to bull-queue - const prefix = config.get('queue.bull.prefix') as string; - const redisOptions = config.get('queue.bull.redis') as object; - // @ts-ignore - this.jobQueue = new Bull('jobs', { prefix, redis: redisOptions, enableReadyCheck: false }); + this.jobQueue = Queue.getInstance().getBullObjectInstance(); } } @@ -251,30 +248,23 @@ export class WorkflowRunner { const workflowExecution: PCancelable = new PCancelable(async (resolve, reject, onCancel) => { onCancel.shouldReject = false; onCancel(async () => { - if (await job.isActive()) { - // Job is already running so tell it to stop - await job.progress(-1); - } else { - // Job did not get started yet so remove from queue - await job.remove(); + await Queue.getInstance().stopJob(job); - const fullRunData: IRun = { - data: { - resultData: { - error: { - message: 'Workflow has been canceled!', - } as IExecutionError, - runData: {}, - }, + const fullRunData :IRun = { + data: { + resultData: { + error: { + message: 'Workflow has been canceled!', + } as IExecutionError, + runData: {}, }, - mode: data.executionMode, - startedAt: new Date(), - stoppedAt: new Date(), - }; - - this.activeExecutions.remove(executionId, fullRunData); - resolve(fullRunData); - } + }, + mode: data.executionMode, + startedAt: new Date(), + stoppedAt: new Date(), + }; + this.activeExecutions.remove(executionId, fullRunData); + resolve(fullRunData); }); const jobData: Promise = job.finished(); diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 3a6337a35d..cc7e942fe5 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -17,6 +17,7 @@ import * as ResponseHelper from './ResponseHelper'; import * as Server from './Server'; import * as TestWebhooks from './TestWebhooks'; import * as WebhookHelpers from './WebhookHelpers'; +import * as WebhookServer from './WebhookServer'; import * as WorkflowExecuteAdditionalData from './WorkflowExecuteAdditionalData'; import * as WorkflowHelpers from './WorkflowHelpers'; export { @@ -29,6 +30,7 @@ export { Server, TestWebhooks, WebhookHelpers, + WebhookServer, WorkflowExecuteAdditionalData, WorkflowHelpers, }; diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index 8f4151b08f..b4a5e248ca 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -125,7 +125,7 @@ export interface IRestApi { getActiveWorkflows(): Promise; getActivationError(id: string): Promise; getCurrentExecutions(filter: object): Promise; - getPastExecutions(filter: object, limit: number, lastId?: string | number): Promise; + getPastExecutions(filter: object, limit: number, lastId?: string | number, firstId?: string | number): Promise; stopCurrentExecution(executionId: string): Promise; makeRestApiRequest(method: string, endpoint: string, data?: any): Promise; // tslint:disable-line:no-any getSettings(): Promise; diff --git a/packages/editor-ui/src/components/ExecutionsList.vue b/packages/editor-ui/src/components/ExecutionsList.vue index b4285bfce6..806fcd5853 100644 --- a/packages/editor-ui/src/components/ExecutionsList.vue +++ b/packages/editor-ui/src/components/ExecutionsList.vue @@ -28,7 +28,10 @@ -   +   + + + Auto refresh @@ -191,6 +194,8 @@ export default mixins( finishedExecutionsCount: 0, checkAll: false, + autoRefresh: true, + autoRefreshInterval: undefined as undefined | NodeJS.Timer, filter: { status: 'ALL', @@ -292,6 +297,10 @@ export default mixins( // Handle the close externally as the visible parameter is an external prop // and is so not allowed to be changed here. this.$emit('closeDialog'); + if (this.autoRefreshInterval) { + clearInterval(this.autoRefreshInterval); + this.autoRefreshInterval = undefined; + } return false; }, displayExecution (execution: IExecutionShortResponse) { @@ -301,6 +310,18 @@ export default mixins( }); this.closeDialog(); }, + handleAutoRefreshToggle () { + if (this.autoRefreshInterval) { + // Clear any previously existing intervals (if any - there shouldn't) + clearInterval(this.autoRefreshInterval); + this.autoRefreshInterval = undefined; + } + + + if (this.autoRefresh) { + this.autoRefreshInterval = setInterval(this.loadAutoRefresh, 4 * 1000); // refresh data every 4 secs + } + }, handleCheckAllChange () { if (this.checkAll === false) { Vue.set(this, 'selectedItems', {}); @@ -389,6 +410,27 @@ export default mixins( this.$store.commit('setActiveExecutions', activeExecutions); }, + async loadAutoRefresh () : Promise { + let firstId: string | number | undefined = 0; + if (this.finishedExecutions.length !== 0) { + firstId = this.finishedExecutions[0].id; + } + const activeExecutionsPromise: Promise = this.restApi().getPastExecutions({}, 100, undefined, firstId); + const currentExecutionsPromise: Promise = this.restApi().getCurrentExecutions({}); + + const results = await Promise.all([activeExecutionsPromise, currentExecutionsPromise]); + + for (const activeExecution of results[1]) { + if (activeExecution.workflowId !== undefined && activeExecution.workflowName === undefined) { + activeExecution.workflowName = this.getWorkflowName(activeExecution.workflowId); + } + } + + this.$store.commit('setActiveExecutions', results[1]); + + this.finishedExecutions.unshift.apply(this.finishedExecutions, results[0].results); + this.finishedExecutionsCount = results[0].count; + }, async loadFinishedExecutions (): Promise { if (this.filter.status === 'running') { this.finishedExecutions = []; @@ -459,6 +501,7 @@ export default mixins( await this.loadWorkflows(); await this.refreshData(); + this.handleAutoRefreshToggle(); }, async retryExecution (execution: IExecutionShortResponse, loadWorkflow?: boolean) { this.isDataLoading = true; @@ -544,6 +587,11 @@ export default mixins(