1
1
mirror of https://github.com/n8n-io/n8n.git synced 2024-09-11 13:15:28 +03:00

fix(core): Fix pairedItem issue with partial manual executions (#8575)

Co-authored-by: Danny Martini <danny@n8n.io>
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Jan Oberhauser 2024-02-23 11:43:08 +01:00 committed by कारतोफ्फेलस्क्रिप्ट™
parent 3b119c8f6a
commit f9edf26a47
14 changed files with 250 additions and 27 deletions

View File

@ -569,4 +569,27 @@ describe('Execution', () => {
expect(interception.request.body.runData).to.include.all.keys(expectedRunDataKeys);
});
});
it('should successfully execute partial executions with nodes attached to the second output', () => {
cy.createFixtureWorkflow(
'Test_Workflow_pairedItem_incomplete_manual_bug.json',
'My test workflow',
);
cy.intercept('POST', '/rest/workflows/run').as('workflowRun');
workflowPage.getters.zoomToFitButton().click();
workflowPage.getters.executeWorkflowButton().click();
workflowPage.getters
.canvasNodeByName('Test Expression')
.findChildByTestId('execute-node-button')
.click({ force: true });
// Check toast (works because Cypress waits enough for the element to show after the http request node has finished)
// Wait for the execution to return.
cy.wait('@workflowRun');
// Wait again for the websocket message to arrive and the UI to update.
cy.wait(100);
workflowPage.getters.errorToast({ timeout: 1 }).should('not.exist');
});
});

View File

@ -0,0 +1,160 @@
{
"name": "Test Workflow pairedItem incomplete manual bug",
"nodes": [
{
"parameters": {},
"id": "f26332f3-c61a-4843-94bd-64a73ad161ff",
"name": "When clicking \"Test workflow\"",
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [
860,
340
]
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "bd522794-d056-48b8-9204-26f7d68288d9",
"name": "test",
"value": "a",
"type": "string"
}
]
},
"options": {}
},
"id": "fae0c907-e2bf-4ecf-82be-f9caa209f925",
"name": "Init Data",
"type": "n8n-nodes-base.set",
"typeVersion": 3.3,
"position": [
1080,
340
]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict"
},
"conditions": [
{
"id": "8db21b4b-1675-4e63-b092-7fcc45a86547",
"leftValue": "={{ $json.test }}",
"rightValue": "b",
"operator": {
"type": "string",
"operation": "equals",
"name": "filter.operator.equals"
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "f7990edd-2c0f-42e6-b3ce-74c7df02b6a4",
"name": "If",
"type": "n8n-nodes-base.if",
"typeVersion": 2,
"position": [
1300,
340
]
},
{
"parameters": {},
"id": "850d48f5-0689-4cab-b30c-30e179577c82",
"name": "NoOp1",
"type": "n8n-nodes-base.noOp",
"typeVersion": 1,
"position": [
1540,
200
]
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "bd522794-d056-48b8-9204-26f7d68288d9",
"name": "test2",
"value": "={{ $('Init Data').item.json.test }}",
"type": "string"
}
]
},
"options": {}
},
"id": "91d93c3a-a557-465e-812b-266d6277b279",
"name": "Test Expression",
"type": "n8n-nodes-base.set",
"typeVersion": 3.3,
"position": [
1540,
440
]
}
],
"pinData": {},
"connections": {
"When clicking \"Test workflow\"": {
"main": [
[
{
"node": "Init Data",
"type": "main",
"index": 0
}
]
]
},
"Init Data": {
"main": [
[
{
"node": "If",
"type": "main",
"index": 0
}
]
]
},
"If": {
"main": [
[
{
"node": "NoOp1",
"type": "main",
"index": 0
}
],
[
{
"node": "Test Expression",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1"
},
"versionId": "765a6d9b-d667-4a59-9bd7-b0bc2627b008",
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "021d3c82ba2d3bc090cbf4fc81c9312668bcc34297e022bb3438c5c88a43a5ff"
},
"id": "qnGQYw8TD58xs214",
"tags": []
}

View File

@ -3,6 +3,8 @@ import { BasePage } from './base';
import { getVisibleSelect } from '../utils';
import { NodeCreator } from './features/node-creator';
type CyGetOptions = Parameters<(typeof cy)['get']>[1];
const nodeCreator = new NodeCreator();
export class WorkflowPage extends BasePage {
url = '/workflow/new';
@ -48,7 +50,8 @@ export class WorkflowPage extends BasePage {
},
successToast: () => cy.get('.el-notification:has(.el-notification--success)'),
warningToast: () => cy.get('.el-notification:has(.el-notification--warning)'),
errorToast: () => cy.get('.el-notification:has(.el-notification--error)'),
errorToast: (options?: CyGetOptions) =>
cy.get('.el-notification:has(.el-notification--error)', options),
infoToast: () => cy.get('.el-notification:has(.el-notification--info)'),
activatorSwitch: () => cy.getByTestId('workflow-activate-switch'),
workflowMenu: () => cy.getByTestId('workflow-menu'),

View File

@ -23,6 +23,7 @@ import type {
INodeProperties,
IUserSettings,
IHttpRequestMethods,
StartNodeData,
} from 'n8n-workflow';
import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
@ -532,7 +533,7 @@ export interface IWorkflowExecutionDataProcess {
pinData?: IPinData;
retryOf?: string;
sessionId?: string;
startNodes?: string[];
startNodes?: StartNodeData[];
workflowData: IWorkflowBase;
userId: string;
}

View File

@ -227,9 +227,9 @@ export function getExecutionStartNode(data: IWorkflowExecutionDataProcess, workf
let startNode;
if (
data.startNodes?.length === 1 &&
Object.keys(data.pinData ?? {}).includes(data.startNodes[0])
Object.keys(data.pinData ?? {}).includes(data.startNodes[0].name)
) {
startNode = workflow.getNode(data.startNodes[0]) ?? undefined;
startNode = workflow.getNode(data.startNodes[0].name) ?? undefined;
}
return startNode;

View File

@ -101,7 +101,7 @@ export class Execute extends BaseCommand {
const user = await Container.get(OwnershipService).getInstanceOwner();
const runData: IWorkflowExecutionDataProcess = {
executionMode: 'cli',
startNodes: [startingNode.name],
startNodes: [{ name: startingNode.name, sourceData: null }],
workflowData,
userId: user.id,
};

View File

@ -620,7 +620,7 @@ export class ExecuteBatch extends BaseCommand {
const runData: IWorkflowExecutionDataProcess = {
executionMode: 'cli',
startNodes: [startingNode.name],
startNodes: [{ name: startingNode.name, sourceData: null }],
workflowData,
userId: ExecuteBatch.instanceOwner.id,
};

View File

@ -1,6 +1,13 @@
import type { IWorkflowDb } from '@/Interfaces';
import type { AuthenticatedRequest } from '@/requests';
import type { INode, IConnections, IWorkflowSettings, IRunData, IPinData } from 'n8n-workflow';
import type {
INode,
IConnections,
IWorkflowSettings,
IRunData,
IPinData,
StartNodeData,
} from 'n8n-workflow';
export declare namespace WorkflowRequest {
type CreateUpdatePayload = Partial<{
@ -19,7 +26,7 @@ export declare namespace WorkflowRequest {
workflowData: IWorkflowDb;
runData: IRunData;
pinData: IPinData;
startNodes?: string[];
startNodes?: StartNodeData[];
destinationNode?: string;
};

View File

@ -101,7 +101,11 @@ export class WorkflowExecutionService {
user: User,
sessionId?: string,
) {
const pinnedTrigger = this.selectPinnedActivatorStarter(workflowData, startNodes, pinData);
const pinnedTrigger = this.selectPinnedActivatorStarter(
workflowData,
startNodes?.map((nodeData) => nodeData.name),
pinData,
);
// If webhooks nodes exist and are active we have to wait for till we receive a call
if (
@ -143,7 +147,7 @@ export class WorkflowExecutionService {
const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name];
if (pinnedTrigger && !hasRunData(pinnedTrigger)) {
data.startNodes = [pinnedTrigger.name];
data.startNodes = [{ name: pinnedTrigger.name, sourceData: null }];
}
const executionId = await this.workflowRunner.run(data);

View File

@ -25,7 +25,7 @@ describe('WorkflowHelpers', () => {
node1: {},
node2: {},
},
startNodes: ['node2'],
startNodes: [{ name: 'node2' }],
} as unknown as IWorkflowExecutionDataProcess;
const workflow = {
getNode(nodeName: string) {

View File

@ -33,6 +33,7 @@ import type {
IWorkflowExecuteAdditionalData,
WorkflowExecuteMode,
CloseFunction,
StartNodeData,
} from 'n8n-workflow';
import {
LoggerProxy as Logger,
@ -157,7 +158,7 @@ export class WorkflowExecute {
runPartialWorkflow(
workflow: Workflow,
runData: IRunData,
startNodes: string[],
startNodes: StartNodeData[],
destinationNode?: string,
pinData?: IPinData,
): PCancelable<IRun> {
@ -175,7 +176,7 @@ export class WorkflowExecute {
const waitingExecution: IWaitingForExecution = {};
const waitingExecutionSource: IWaitingForExecutionSource = {};
for (const startNode of startNodes) {
incomingNodeConnections = workflow.connectionsByDestinationNode[startNode];
incomingNodeConnections = workflow.connectionsByDestinationNode[startNode.name];
const incomingData: INodeExecutionData[][] = [];
let incomingSourceData: ITaskDataConnectionsSource | null = null;
@ -210,15 +211,13 @@ export class WorkflowExecute {
}
}
incomingSourceData.main.push({
previousNode: connection.node,
});
incomingSourceData.main.push(startNode.sourceData);
}
}
}
const executeData: IExecuteData = {
node: workflow.getNode(startNode) as INode,
node: workflow.getNode(startNode.name) as INode,
data: {
main: incomingData,
},

View File

@ -47,6 +47,7 @@ import {
type INodeProperties,
type NodeConnectionType,
type INodeCredentialsDetails,
type StartNodeData,
} from 'n8n-workflow';
import type { BulkCommand, Undoable } from '@/models/history';
import type { PartialBy, TupleToUnion } from '@/utils/typeHelpers';
@ -188,7 +189,7 @@ export interface IAiData {
export interface IStartRunData {
workflowData: IWorkflowData;
startNodes?: string[];
startNodes?: StartNodeData[];
destinationNode?: string;
runData?: IRunData;
pinData?: IPinData;

View File

@ -1,6 +1,7 @@
import type { IExecutionPushResponse, IExecutionResponse, IStartRunData } from '@/Interface';
import { mapStores } from 'pinia';
import { defineComponent } from 'vue';
import { get } from 'lodash-es';
import type {
IDataObject,
@ -10,6 +11,7 @@ import type {
IPinData,
IWorkflowBase,
Workflow,
StartNodeData,
} from 'n8n-workflow';
import {
NodeHelpers,
@ -37,8 +39,8 @@ export const consolidateRunDataAndStartNodes = (
runData: IRunData | null,
pinData: IPinData | undefined,
workflow: Workflow,
): { runData: IRunData | undefined; startNodes: string[] } => {
const startNodes: string[] = [];
): { runData: IRunData | undefined; startNodeNames: string[] } => {
const startNodeNames: string[] = [];
let newRunData: IRunData | undefined;
if (runData !== null && Object.keys(runData).length !== 0) {
@ -59,7 +61,7 @@ export const consolidateRunDataAndStartNodes = (
// When we hit a node which has no data we stop and set it
// as a start node the execution from and then go on with other
// direct input nodes
startNodes.push(parentNode);
startNodeNames.push(parentNode);
break;
}
if (runData[parentNode]) {
@ -75,7 +77,7 @@ export const consolidateRunDataAndStartNodes = (
}
}
return { runData: newRunData, startNodes };
return { runData: newRunData, startNodeNames };
};
export const workflowRun = defineComponent({
@ -243,18 +245,18 @@ export const workflowRun = defineComponent({
workflow,
);
const { startNodes } = consolidatedData;
const { startNodeNames } = consolidatedData;
let { runData: newRunData } = consolidatedData;
let executedNode: string | undefined;
if (
startNodes.length === 0 &&
startNodeNames.length === 0 &&
'destinationNode' in options &&
options.destinationNode !== undefined
) {
executedNode = options.destinationNode;
startNodes.push(options.destinationNode);
startNodeNames.push(options.destinationNode);
} else if ('triggerNode' in options && 'nodeData' in options) {
startNodes.push(
startNodeNames.push(
...workflow.getChildNodes(options.triggerNode, NodeConnectionType.Main, 1),
);
newRunData = {
@ -263,6 +265,25 @@ export const workflowRun = defineComponent({
executedNode = options.triggerNode;
}
const startNodes: StartNodeData[] = startNodeNames.map((name) => {
// Find for each start node the source data
let sourceData = get(runData, [name, 0, 'source', 0], null);
if (sourceData === null) {
const parentNodes = workflow.getParentNodes(name, NodeConnectionType.Main, 1);
const executeData = this.workflowHelpers.executeData(
parentNodes,
name,
NodeConnectionType.Main,
0,
);
sourceData = get(executeData, ['source', NodeConnectionType.Main, 0], null);
}
return {
name,
sourceData,
};
});
const startRunData: IStartRunData = {
workflowData,
runData: newRunData,
@ -288,7 +309,6 @@ export const workflowRun = defineComponent({
resultData: {
runData: newRunData || {},
pinData: workflowData.pinData,
startNodes,
workflowData,
},
} as IRunExecutionData,

View File

@ -1943,6 +1943,11 @@ export interface ISourceData {
previousNodeRun?: number; // If undefined "0" gets used
}
export interface StartNodeData {
name: string;
sourceData: ISourceData | null;
}
// The data for all the different kind of connections (like main) and all the indexes
export interface ITaskDataConnections {
// Key for each input type and because there can be multiple inputs of the same type it is an array