forked from andy71/rpilocator-rss-feed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpilocator_telegram.py
105 lines (77 loc) · 2.77 KB
/
rpilocator_telegram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import telegram
import sys
import feedparser
import time
import datetime
import os
# Feed URL
FEED_URL = os.getenv('FEED_URL', 'https://rpilocator.com/feed/')
# Telegram settings
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
if not TELEGRAM_BOT_TOKEN:
raise ValueError('Missing TELEGRAM_BOT_TOKEN environment variable.')
TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID')
if not TELEGRAM_CHAT_ID:
raise ValueError('Missing TELEGRAM_CHAT_ID environment variable.')
# Customize the message title
MESSAGE_TITLE = 'xlocator Stock Alert'
# User Agent
USER_AGENT = 'xlocator feed alert'
# Create the message body
def formatMessage(entry):
message = [
f"<b><u>{MESSAGE_TITLE}</u></b>",
f"",
f"{entry.title}",
f"",
f"{entry.link}",
]
message = '\n'.join(message)
return message
bot = telegram.Bot(token=TELEGRAM_BOT_TOKEN)
bot.send_message(chat_id=TELEGRAM_CHAT_ID,
text=f'Starting rpilocator notifier with FEED_URL={FEED_URL}...', disable_notification=True)
print(f'Starting rpilocator notifier with FEED_URL={FEED_URL}...')
# Telegram Nachricht senden
def sendMessage(message):
try:
result_msg = bot.sendMessage(
chat_id=TELEGRAM_CHAT_ID, text=message, parse_mode='HTML')
return isinstance(result_msg, telegram.message.Message)
except Exception as err:
print('Unhandled exception while sending telegram message: %s' %
err, file=sys.stderr)
return False
def updateTime(f: feedparser.FeedParserDict):
return datetime.datetime.fromtimestamp(time.mktime(f.feed.updated_parsed))
# Set control to blank list
control = []
# Fetch the feed
f = feedparser.parse(FEED_URL, agent=USER_AGENT)
last_find = datetime.datetime.fromtimestamp(0)
last_feed_update = updateTime(f)
# If there are entries in the feed, add entry guid to the control variable
if f.entries:
for entry in f.entries:
control.append(entry.id)
# Only wait 30 seconds after initial run.
time.sleep(30)
while True:
# Fetch the feed again, and again, and again...
f = feedparser.parse(FEED_URL, agent=USER_AGENT)
last_feed_update = updateTime(f)
# Compare feed entries to control list.
# If there are new entries, send a message/push
# and add the new entry to control variable
for entry in f.entries:
if entry.id not in control:
print('Found:', entry.title)
message = formatMessage(entry)
sendMessage(message)
# Add entry guid to the control variable
control.append(entry.id)
last_find = datetime.datetime.now()
now = datetime.datetime.now()
print(f'Time since last find: {now - last_find}; '
f'time since last feed update: {now - last_feed_update}.')
time.sleep(59)