Skip to content

Commit

Permalink
restructure to follow good OOPS SOLID Practices
Browse files Browse the repository at this point in the history
  • Loading branch information
Yash Bansal committed Feb 13, 2023
1 parent 37b2dda commit a094d37
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 83 deletions.
3 changes: 2 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
DATABASE_URL=mongodb://root:prisma@localhost:27017/streetDB?authSource=admin&retryWrites=false
KAFKA_PORT=9092
KAFKA_URL=localhost:9092
KAFKA_TOPIC=test-events
KAFKA_TOPIC=street-events
CONSUMER_NAME=street-consumer
9 changes: 9 additions & 0 deletions src/consumer/consumerService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export default interface ConsumerService {
topicName: string;
consumer: any;
consumerCount: Number;
subscribe: () => any;
consume: () => any;
connect: () => any;
disconnect: () => any;
}
52 changes: 0 additions & 52 deletions src/consumer/index.ts

This file was deleted.

65 changes: 65 additions & 0 deletions src/consumer/kafkaConsumerService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import ConsumerService from "./consumerService";
import { logLevel } from "kafkajs";
import { writeStreetInfo } from "../data/prismaClient";
import { StreetsService } from "../israeliStreets";
const { Kafka } = require('kafkajs');
const dotenv = require('dotenv');
dotenv.config();
const kafka = new Kafka({
clientId: process.env.CONSUMER_NAME,
brokers: [process.env.KAFKA_URL],
logLevel: logLevel.DEBUG
});

const consumerNumber = process.argv[2] || '1';

const logMessage = (counter, consumerName, topic, partition, message) => {
console.log(`received a new message number: ${counter} on ${consumerName}: `, {
topic,
partition,
message: {
offset: message.offset,
headers: message.headers,
value: message.value.toString()
},
});
};

class KafkaConsumer implements ConsumerService {
topicName: string;
consumer: any;
consumerCount: any;
constructor(topic_name) {
this.topicName = topic_name
this.consumer = kafka.consumer({groupId: 'street'});
this.consumerCount = 1;
}
async connect () {
await this.consumer.connect()
}
async subscribe () {
await this.consumer.subscribe({ topic: this.topicName })
}
async consume () {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const { streetId } = JSON.parse(message.value.toString())
if (!streetId) {
return
}
const streetInfo = await StreetsService.getStreetInfoById(streetId)
const writeInfo = await writeStreetInfo(streetInfo)
logMessage(this.consumerCount, `streetsConsumer#${consumerNumber} ${writeInfo}`, topic, partition, message);
this.consumerCount++;
},
});
};
disconnect () {
this.consumer.disconnect();
};
}
// console.log(process.env)
const kafkaConsumer = new KafkaConsumer(process.env.KAFKA_TOPIC);
kafkaConsumer.connect()
kafkaConsumer.subscribe()
kafkaConsumer.consume()
15 changes: 11 additions & 4 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import { StreetsService as streetsService, city as City} from './israeliStreets/index'
import Producer from './publisher/index';
const args = process.argv.slice(2);
import Producer from './publisher/kafkaPublisherService';
const cities = process.argv.slice(2);

args.forEach((city_name: City) => {
Producer.connect();

cities.forEach((city_name: City) => {
streetsService.getStreetsInCity(city_name)
.then(({city, streets}) => {
streets.forEach((street) => {
Producer(JSON.stringify(street))
Producer.produce(JSON.stringify(street))
})
})
.catch(err => console.log(err))
.finally(() => setTimeout(() => {
Producer.disconnect()
process.exit(0)
}, 5000))
})
26 changes: 0 additions & 26 deletions src/publisher/index.ts

This file was deleted.

36 changes: 36 additions & 0 deletions src/publisher/kafkaPublisherService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import PublisherService from './publisherService'
const { Kafka } = require('kafkajs');
const dotenv = require('dotenv');
dotenv.config();

const kafka = new Kafka({
clientId: process.env.CONSUMER_NAME,
brokers: [process.env.KAFKA_URL]
});


export class Producer implements PublisherService {
topicName: string;
producer: any;
constructor (topic_name) {
this.topicName = topic_name
this.producer = kafka.producer();
}
connect() {
this.producer.connect();
}
produce(message:string) {
console.log(message)
return this.producer.send({
topic: this.topicName,
messages: [
{ value: message },
],
});
}
disconnect() {
this.producer.disconnect();
}
}

export default new Producer(process.env.KAFKA_TOPIC);
7 changes: 7 additions & 0 deletions src/publisher/publisherService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export default interface PublisherService {
topicName: string;
producer: any;
produce: (message: string) => Promise<any>
connect: () => void
disconnect: () => void
}

0 comments on commit a094d37

Please sign in to comment.