1
1
mirror of https://github.com/n8n-io/n8n.git synced 2024-10-08 10:39:01 +03:00

fix(kafkaTrigger Node): fix kafka trigger not working with default max requests value

This commit is contained in:
ruanjiefeng 2022-09-05 19:11:25 +08:00 committed by GitHub
parent 9314086b6a
commit 71cae90679
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 2 deletions

View File

@ -11,6 +11,7 @@ export class Kafka implements ICredentialType {
type: 'string',
default: '',
placeholder: 'my-app',
hint: 'Will not affect the connection, but will be used to identify the client in the Kafka server logs. Read more <a href="https://kafka.apache.org/documentation/#design_quotasgroups">here</a>',
},
{
displayName: 'Brokers',

View File

@ -11,7 +11,11 @@ import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { IExecuteFunctions } from 'n8n-core';
import {
ICredentialDataDecryptedObject,
ICredentialsDecrypted,
ICredentialTestFunctions,
IDataObject,
INodeCredentialTestResult,
INodeExecutionData,
INodeType,
INodeTypeDescription,
@ -35,6 +39,7 @@ export class Kafka implements INodeType {
{
name: 'kafka',
required: true,
testedBy: 'kafkaConnectionTest',
},
],
properties: [
@ -185,6 +190,56 @@ export class Kafka implements INodeType {
],
};
methods = {
credentialTest: {
async kafkaConnectionTest(
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as ICredentialDataDecryptedObject;
try {
const brokers = ((credentials.brokers as string) || '')
.split(',')
.map((item) => item.trim()) as string[];
const clientId = credentials.clientId as string;
const ssl = credentials.ssl as boolean;
const config: KafkaConfig = {
clientId,
brokers,
ssl,
};
if (credentials.authentication === true) {
if (!(credentials.username && credentials.password)) {
throw Error('Username and password are required for authentication');
}
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
mechanism: credentials.saslMechanism as string,
} as SASLOptions;
}
const kafka = new apacheKafka(config);
await kafka.admin().connect();
await kafka.admin().disconnect();
return {
status: 'OK',
message: 'Authentication successful',
};
} catch (error) {
return {
status: 'Error',
message: error.message,
};
}
},
},
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();

View File

@ -114,7 +114,7 @@ export class KafkaTrigger implements INodeType {
displayName: 'Max Number of Requests',
name: 'maxInFlightRequests',
type: 'number',
default: 0,
default: 1,
description:
'Max number of requests that may be in progress at any time. If falsey then no limit.',
},
@ -202,9 +202,15 @@ export class KafkaTrigger implements INodeType {
const kafka = new apacheKafka(config);
const maxInFlightRequests = (
this.getNodeParameter('options.maxInFlightRequests', null) === 0
? null
: this.getNodeParameter('options.maxInFlightRequests', null)
) as number;
const consumer = kafka.consumer({
groupId,
maxInFlightRequests: this.getNodeParameter('options.maxInFlightRequests', 0) as number,
maxInFlightRequests,
sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number,
heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number,
});