-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
30 lines (25 loc) · 917 Bytes
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Process, Queue
from config import TOPICS, NUM_MONGO_PROCESSES
from kafka_consumer import KafkaConsumer
from scylladb_processor import InsertIntoScylla
def main():
topics = TOPICS
message_queue = Queue()
kafka_process = Process(target=KafkaConsumer, args=(topics, message_queue))
kafka_process.start()
scylla_processes = []
for _ in range(NUM_MONGO_PROCESSES):
scylla_process = Process(target=InsertIntoScylla, args=(message_queue,))
scylla_process.start()
scylla_processes.append(scylla_process)
try:
while True:
pass
except KeyboardInterrupt:
print("KeyboardInterrupt received. Stopping processes...")
kafka_process.terminate()
for scylla_process in scylla_processes:
scylla_process.terminate()
scylla_process.join()
if __name__ == "__main__":
main()