forked from DeepInsight-AI/DeepBI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hangoutschat.py
109 lines (94 loc) · 3.47 KB
/
hangoutschat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import logging
import requests
from bi.destinations import *
from bi.utils import json_dumps
class HangoutsChat(BaseDestination):
@classmethod
def name(cls):
return "Google Hangouts Chat"
@classmethod
def type(cls):
return "hangouts_chat"
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"url": {
"type": "string",
"title": "Webhook URL (get it from the room settings)",
},
"icon_url": {
"type": "string",
"title": "Icon URL (32x32 or multiple, png format)",
},
},
"secret": ["url"],
"required": ["url"],
}
@classmethod
def icon(cls):
return "fa-bolt"
def notify(self, alert, query, user, new_state, app, host, options):
try:
if new_state == "triggered":
message = '<b><font color="#c0392b">Triggered</font></b>'
elif new_state == "ok":
message = '<font color="#27ae60">Went back to normal</font>'
else:
message = (
"Unable to determine status. Check Query and Alert configuration."
)
if alert.custom_subject:
title = alert.custom_subject
else:
title = alert.name
data = {
"cards": [
{
"header": {"title": title},
"sections": [
{"widgets": [{"textParagraph": {"text": message}}]}
],
}
]
}
if alert.custom_body:
data["cards"][0]["sections"].append(
{"widgets": [{"textParagraph": {"text": alert.custom_body}}]}
)
if options.get("icon_url"):
data["cards"][0]["header"]["imageUrl"] = options.get("icon_url")
# Hangouts Chat will create a blank card if an invalid URL (no hostname) is posted.
if host:
data["cards"][0]["sections"][0]["widgets"].append(
{
"buttons": [
{
"textButton": {
"text": "OPEN QUERY",
"onClick": {
"openLink": {
"url": "{host}/queries/{query_id}".format(
host=host, query_id=query.id
)
}
},
}
}
]
}
)
headers = {"Content-Type": "application/json; charset=UTF-8"}
resp = requests.post(
options.get("url"), data=json_dumps(data), headers=headers, timeout=5.0
)
if resp.status_code != 200:
logging.error(
"webhook send ERROR. status_code => {status}".format(
status=resp.status_code
)
)
except Exception:
logging.exception("webhook send ERROR.")
register(HangoutsChat)