Skip to content

Commit

Permalink
Prettified code
Browse files Browse the repository at this point in the history
  • Loading branch information
TalAter committed Sep 29, 2018
1 parent 62ba099 commit 881dc5c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 47 deletions.
107 changes: 65 additions & 42 deletions src/KafkaNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,28 @@ import sdkLogger from './sdkLogger';

export default class Kafka extends KafkaBase implements IKafka {
private async getKafkaClient(config: IConfig): Promise<KafkaClient> {
return retryPromise(currentAttempt => new Promise<KafkaClient>((resolve, reject) => {
const client = new KafkaClient({ kafkaHost: config.kafkaSeedUrls[0] });
sdkLogger(`Kafka connecting... ${currentAttempt > 1 ? `${currentAttempt} try` : ''}`);
client.connect();
client.on('ready', () => {
sdkLogger(`Kafka connected`);
resolve(client);
});
client.on('error', err => {
sdkLogger(`Kafka connection error ${err}`);
reject(err);
});
}));
return retryPromise(
currentAttempt =>
new Promise<KafkaClient>((resolve, reject) => {
const client = new KafkaClient({
kafkaHost: config.kafkaSeedUrls[0],
});
sdkLogger(
`Kafka connecting... ${
currentAttempt > 1 ? `${currentAttempt} try` : ''
}`,
);
client.connect();
client.on('ready', () => {
sdkLogger(`Kafka connected`);
resolve(client);
});
client.on('error', err => {
sdkLogger(`Kafka connection error ${err}`);
reject(err);
});
}),
);
}

private async getProducer(config: IConfig): Promise<Producer> {
Expand All @@ -45,22 +54,29 @@ export default class Kafka extends KafkaBase implements IKafka {
}

public async createTopic(topicId: string, config: IConfig): Promise<void> {
return retryPromise(currentAttempt => new Promise<void>(async (resolve, reject) => {
const client = await this.getKafkaClient(config);
sdkLogger(`Kafka creating topic ${topicId}... ${currentAttempt > 1 ? `${currentAttempt} try` : ''}`);
(client as any).createTopics(
[{ topic: topicId, partitions: 1, replicationFactor: 1 }],
(err: any, data: any) => {
if (err) {
sdkLogger(`Kafka error creating topic ${topicId}`);
reject(err);
} else {
sdkLogger(`Kafka topic created ${topicId}`);
resolve();
}
},
);
}));
return retryPromise(
currentAttempt =>
new Promise<void>(async (resolve, reject) => {
const client = await this.getKafkaClient(config);
sdkLogger(
`Kafka creating topic ${topicId}... ${
currentAttempt > 1 ? `${currentAttempt} try` : ''
}`,
);
(client as any).createTopics(
[{ topic: topicId, partitions: 1, replicationFactor: 1 }],
(err: any, data: any) => {
if (err) {
sdkLogger(`Kafka error creating topic ${topicId}`);
reject(err);
} else {
sdkLogger(`Kafka topic created ${topicId}`);
resolve();
}
},
);
}),
);
}

public sendMessage(
Expand All @@ -75,19 +91,26 @@ export default class Kafka extends KafkaBase implements IKafka {
payloads: ProduceRequest[],
config: IConfig,
): Promise<void> {
return retryPromise(currentAttempt => new Promise<void>(async (resolve, reject) => {
const producer = await this.getProducer(config);
sdkLogger(`Kafka sending ${JSON.stringify(payloads)}... ${currentAttempt > 1 ? `${currentAttempt} try` : ''}`);
producer.send(payloads, (err: any, data: any) => {
if (err) {
sdkLogger(`Kafka error sending ${JSON.stringify(payloads)}`);
reject(err);
} else {
sdkLogger(`Kafka sent ${JSON.stringify(payloads)}`);
resolve();
}
});
}));
return retryPromise(
currentAttempt =>
new Promise<void>(async (resolve, reject) => {
const producer = await this.getProducer(config);
sdkLogger(
`Kafka sending ${JSON.stringify(payloads)}... ${
currentAttempt > 1 ? `${currentAttempt} try` : ''
}`,
);
producer.send(payloads, (err: any, data: any) => {
if (err) {
sdkLogger(`Kafka error sending ${JSON.stringify(payloads)}`);
reject(err);
} else {
sdkLogger(`Kafka sent ${JSON.stringify(payloads)}`);
resolve();
}
});
}),
);
}

public sendParams(
Expand Down
10 changes: 5 additions & 5 deletions src/sdkLogger.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
function sdkLogger(message?: any, ...optionalParams: any[]) {
const sdkDebugLog = process.env.SDK_DEBUG_LOG === 'true';
if (sdkDebugLog) {
// tslint:disable-next-line:no-console
console.log(message, ...optionalParams);
}
const sdkDebugLog = process.env.SDK_DEBUG_LOG === 'true';
if (sdkDebugLog) {
// tslint:disable-next-line:no-console
console.log(message, ...optionalParams);
}
}

export default sdkLogger;

0 comments on commit 881dc5c

Please sign in to comment.