Skip to content

Commit

Permalink
Added Kafka consumer and producer to store data in DB
Browse files Browse the repository at this point in the history
  • Loading branch information
sudiptab2100 committed Nov 19, 2023
1 parent 309b7c0 commit 2322eb7
Showing 1 changed file with 51 additions and 7 deletions.
58 changes: 51 additions & 7 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
from pymongo import MongoClient
from datetime import datetime
from dateutil import parser
from kafka import KafkaProducer, KafkaConsumer
import json
from threading import Thread
from bson import ObjectId

app = FastAPI()

Expand All @@ -10,19 +14,59 @@
db = client["dyte"] # Replace with your actual database name
collection = db["dyte"]

@app.post("/")
async def handle_logs(data: dict):
try:
print("Received JSON data:")
print(data)

class MongoEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, ObjectId):
return str(obj)
return json.JSONEncoder.default(self, obj)

# Kafka configuration
kafka_bootstrap_servers = 'localhost:9092'
kafka_topic = 'dyte-logs'

# Kafka Producer
producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v, cls=MongoEncoder).encode('utf-8')
)

def kafka_producer_worker(data):
producer.send(kafka_topic, value=data)

# Kafka Consumer
consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=kafka_bootstrap_servers,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

def kafka_consumer_worker():
for message in consumer:
# Process the Kafka messages as needed
print(f"Received message: {message.value}")
data = message.value
# Add timestamp field as a datetime object
data["tObj"] = parser.parse(data["timestamp"])

data['pRID'] = data['metadata']['parentResourceId']
# Insert the data into MongoDB
collection.insert_one(data)
print("Stored in DB")

# Start Kafka Consumer in a separate thread
consumer_thread = Thread(target=kafka_consumer_worker)
consumer_thread.start()

@app.post("/")
async def handle_logs(data: dict):
try:
print("Received JSON data:")
print(data)

# Produce the data to Kafka in a separate thread
kafka_producer_thread = Thread(target=kafka_producer_worker, args=(data,))
kafka_producer_thread.start()

return {"message": "JSON data received successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing JSON data: {str(e)}")
Expand Down

0 comments on commit 2322eb7

Please sign in to comment.