Skip to content

Commit

Permalink
fix(MQTT Node): Close connection if connection attempt fails (n8n-io#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tomi authored Sep 18, 2024
1 parent 0a317b7 commit ee7147c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 10 deletions.
5 changes: 5 additions & 0 deletions packages/nodes-base/nodes/MQTT/GenericFunctions.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { connect, type IClientOptions, type MqttClient } from 'mqtt';
import { ApplicationError, randomString } from 'n8n-workflow';

import { formatPrivateKey } from '@utils/utilities';

interface BaseMqttCredential {
Expand Down Expand Up @@ -62,6 +63,10 @@ export const createClient = async (credentials: MqttCredential): Promise<MqttCli
const onError = (error: Error) => {
client.removeListener('connect', onConnect);
client.removeListener('error', onError);
// mqtt client has an automatic reconnect mechanism that will
// keep trying to reconnect until it succeeds unless we
// explicitly close the client
client.end();
reject(new ApplicationError(error.message));
};

Expand Down
7 changes: 5 additions & 2 deletions packages/nodes-base/nodes/MQTT/Mqtt.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type INodeType,
type INodeTypeDescription,
NodeConnectionType,
ensureError,
} from 'n8n-workflow';

import { createClient, type MqttCredential } from './GenericFunctions';
Expand Down Expand Up @@ -116,10 +117,12 @@ export class Mqtt implements INodeType {
try {
const client = await createClient(credentials);
client.end();
} catch (error) {
} catch (e) {
const error = ensureError(e);

return {
status: 'Error',
message: (error as Error).message,
message: error.message,
};
}
return {
Expand Down
44 changes: 36 additions & 8 deletions packages/nodes-base/nodes/MQTT/test/GenericFunctions.test.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import { MqttClient } from 'mqtt';
import { mock } from 'jest-mock-extended';
import { MqttClient } from 'mqtt';
import { ApplicationError } from 'n8n-workflow';

import { createClient, type MqttCredential } from '../GenericFunctions';

describe('createClient', () => {
const mockConnect = jest.spyOn(MqttClient.prototype, 'connect').mockImplementation(function (
this: MqttClient,
) {
setImmediate(() => this.emit('connect', mock()));
return this;
});

beforeEach(() => jest.clearAllMocks());

it('should create a client with minimal credentials', async () => {
const mockConnect = jest.spyOn(MqttClient.prototype, 'connect').mockImplementation(function (
this: MqttClient,
) {
setImmediate(() => this.emit('connect', mock()));
return this;
});

const credentials = mock<MqttCredential>({
protocol: 'mqtt',
host: 'localhost',
Expand All @@ -35,4 +36,31 @@ describe('createClient', () => {
clientId: 'testClient',
});
});

it('should reject with ApplicationError on connection error and close connection', async () => {
const mockConnect = jest.spyOn(MqttClient.prototype, 'connect').mockImplementation(function (
this: MqttClient,
) {
setImmediate(() => this.emit('error', new Error('Connection failed')));
return this;
});
const mockEnd = jest.spyOn(MqttClient.prototype, 'end').mockImplementation();

const credentials: MqttCredential = {
protocol: 'mqtt',
host: 'localhost',
port: 1883,
clean: true,
clientId: 'testClientId',
username: 'testUser',
password: 'testPass',
ssl: false,
};

const clientPromise = createClient(credentials);

await expect(clientPromise).rejects.toThrow(ApplicationError);
expect(mockConnect).toBeCalledTimes(1);
expect(mockEnd).toBeCalledTimes(1);
});
});

0 comments on commit ee7147c

Please sign in to comment.