-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
66 lines (50 loc) · 1.51 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""Main App
App to push messages into the redis queue
"""
from datetime import datetime
from json import dumps
import random
from time import sleep
from uuid import uuid4
import redis
from decouple import config
def redis_db():
try:
db = redis.Redis(
host=config("REDIS_HOST"),
port=config("REDIS_PORT", cast=int),
db=config("REDIS_DB_NUMBER", cast=int),
password=config("REDIS_PASSWORD"),
decode_responses=True
)
db.ping()
return db
except redis.exeptions.RedisError as e:
print(f"Erro ao conectar ao banco de dados Redis: {e}")
return None
def redis_queue_push(db: redis.Redis, message):
db.lpush(config("REDIS_QUEUE_NAME"), message)
def main(num_messages: int, delay: float = 1):
"""
Generates `num_messages` and pushes them to a Redis queue
:param num_messages:
:return:
"""
db = redis_db()
for i in range(num_messages):
#Generate message data
message = {
"id": str(uuid4()),
"ts": datetime.utcnow().isoformat(),
"data": {
"message_number": i,
"x": random.randrange(0, 100),
"y": random.randrange(0, 100),
},
}
message_json = dumps(message)
print(f"Sending message {i+1} (id={message['id']})")
redis_queue_push(db, message_json)
sleep(delay)
if __name__ == "__main__":
main(num_messages=30, delay= 0.1)