Skip to content

Commit

Permalink
connect websocket server to the database
Browse files Browse the repository at this point in the history
  • Loading branch information
zarifmahfuz committed Feb 13, 2022
1 parent cb7c8ce commit f308296
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 111 deletions.
74 changes: 74 additions & 0 deletions websocket/chat_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from datetime import datetime
from itertools import cycle
import socket
import threading
from message_handler import MessageHandler, ChatMessage

from utils import mesg_index

class ChatQueue(object):
"""
Maintains a queue of chat messages and exposes read and write operations to the queue.
"""
def __init__(self, room_id: str, max_len: int, message_handler: MessageHandler, max_index: int = 100):
self.cyclic_count = cycle(range(max_index))
self.current_idx = -1
self.max_idx = max_index
self.room_id = room_id
self.messages = []
self.max_len = max_len

# fetch # max_len most recent messages for this room from the database
self.message_handler = message_handler
self.messages = self.message_handler.get_most_recent_messages(room_id, max_len, self.cyclic_count)

# test - should print (max_len) => test passed
# self.current_idx = next(self.cyclic_count)
# print(f'current_idx: {self.current_idx}')

self.read_cnt = 0
self.write_mtx = threading.Lock()
self.read_cnt_mtx = threading.Lock()


def reader(self, last_read: int) -> list:
# only one thread is allowed to change the read count at a time
self.read_cnt_mtx.acquire()
self.read_cnt += 1
if (self.read_cnt == 1):
# if this is the first reader, make sure that no writer is writing to the queue
self.write_mtx.acquire()
self.read_cnt_mtx.release()

# perform read
if (last_read == self.current_idx):
response = None
else:
idx_to_read_from = mesg_index(self.messages[0].msg_idx, last_read, self.current_idx, self.max_idx)
response = self.messages[idx_to_read_from:]

self.read_cnt_mtx.acquire()
self.read_cnt -= 1
if (self.read_cnt == 0):
# if this is the last reader, release the write mutex so that writer is able to write now
self.write_mtx.release()
self.read_cnt_mtx.release()

return response

def writer(self, data: dict) -> None:
# no other readers allowed to read and no other writers allowed to write
# enter critical section~!
self.write_mtx.acquire()
self.current_idx = next(self.cyclic_count)
self.messages.append(ChatMessage(self.current_idx, datetime.utcnow(), data["content"], data["sender"], self.room_id))
if (len(self.messages) > self.max_len):
del self.messages[0]
# leave critical section~!
self.write_mtx.release()


if __name__ == "__main__":
# test code
message_handler = MessageHandler()
cq = ChatQueue("Room 1", 10, message_handler)
111 changes: 111 additions & 0 deletions websocket/message_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# from websocket.database_manager import rooms_collection, messages_collection
from this import d
from database_manager import rooms_collection, messages_collection
from datetime import datetime
from utils import get_date_repr
from itertools import cycle
import pymongo
import pprint

class ChatMessage(object):
"""
Object that holds the chat message.
"""
def __init__(self, msg_idx: int, timestamp: datetime, content: str, sender: str, room_id: str):
self.msg_idx = msg_idx
self.timestamp = timestamp
self.content = content
self.sender = sender
self.room_id = room_id

def get_data_for_db(self):
"""
Returns a dictionary representing a document in the Messages collection.
"""
return {"timestamp": self.timestamp, "sender": self.sender, "content": self.content, "room_id": self.room_id}

def gata_data_for_pres(self) -> str:
"""
Returns a string representation of a chat message.
"""
return get_date_repr(self.timestamp) + ", " + self.sender + ": " + self.content

def __str__(self):
ret = "{\n\t" + "timestamp: " + get_date_repr(self.timestamp) + ",\n\t" + "content: " + self.content + ",\n\t"\
+ "sender: " + self.sender + ",\n\t" + "room_id: " + self.room_id + "\r\n}"
return ret


class MessageHandler(object):
def __init__(self):
pass

def write_to_db(self, data):
"""
Writes given data into the Messages collection.
Parameters:
data (ChatMessage or list(ChatMessage)): messages to be inserted
"""
# executing write operations in batches reduces the number of network round trips, increasing write throughput
if (isinstance(data, list)):
# assuming that the list contains ChatMessage objects
req = []
for chat_message in data:
req.append(chat_message.get_data_for_db())
result = messages_collection.insert_many(req).inserted_ids
print(f'# message write requests: {len(data)}')
print(f'# actual writes = {len(result)}')
elif (isinstance(data, ChatMessage)):
data = data.get_data_for_db()
result = messages_collection.insert_one(data)
else:
print("Invalid data given to write_to_db")

def get_most_recent_messages(self, room_id: str, count: int, cyclic_count: cycle):
"""
Gets the most recent messages in a particular chat room.
"""
# the _id field in MongoDB has a timestamp component to it, so the collection should be sorted by
# insertion order
# if we have a timestamps [1,2,3,4,5,6,7,8,9] and we want to get the 5 most recent messages
# the first sort returns [9,8,7,6,5] and the second sort will return [5,6,7,8,9]
result_cursor = messages_collection.aggregate([
{"$match": {
"room_id": room_id
}},
{"$sort": {
"_id": -1
}},
{"$limit": count},
{"$sort": {
"_id": 1
}}
])
# build a list of chat message objects
chat_messages = []
i = 0
for doc in result_cursor:
chat_message = ChatMessage(next(cyclic_count), doc["timestamp"], doc["content"], doc["sender"], doc["room_id"])
chat_messages.append(chat_message)
i += 1
return chat_messages


if __name__ == "__main__":
# TEST CODE
message_handler = MessageHandler()
content = ["hello", "hi", "hey", "good morning", "afternoon", "evening", "night", "summer", "winter", "rain",
"autumn", "i love summer"]
data = []
for msg in content:
chat_msg = ChatMessage(0, datetime.utcnow(), msg, "zarifmahfuz", "Room 1")
data.append(chat_msg)

# post the data
# message_handler.write_to_db(data)

messages = message_handler.get_most_recent_messages("Room 1", 10)
for m in messages:
print(m)
# messages_collection.delete_many({})
3 changes: 2 additions & 1 deletion websocket/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pymongo
jsonschema
jsonschema
requests
23 changes: 23 additions & 0 deletions websocket/room_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from database_manager import rooms_collection
import pymongo

class RoomManager(object):
"""
Handles the creation of rooms.
"""
def __init__(object):
pass

def create_room(self, room_id: str) -> str:
"""
Creates a new room in the Rooms collection, if it does not already exist.
Returns:
room_id: indicates that the room was created or already exists.
"""
doc = {"_id": room_id}
try:
room_id = rooms_collection.insert_one(doc).inserted_id
except pymongo.errors.DuplicateKeyError:
pass
return room_id
Loading

0 comments on commit f308296

Please sign in to comment.