forked from eNMS-automation/eNMS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.py
126 lines (111 loc) · 4.57 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from apscheduler.jobstores.base import JobLookupError
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
from flask import Flask, jsonify, request
from json import load
from logging.config import dictConfig
from os import getenv
from pathlib import Path
from requests import post
from requests.auth import HTTPBasicAuth
class Scheduler(Flask):
days = {
"0": "sun",
"1": "mon",
"2": "tue",
"3": "wed",
"4": "thu",
"5": "fri",
"6": "sat",
"7": "sun",
"*": "*",
}
seconds = {"seconds": 1, "minutes": 60, "hours": 3600, "days": 86400}
def __init__(self):
super().__init__(__name__)
with open(Path.cwd().parent / "setup" / "scheduler.json", "r") as file:
self.settings = load(file)
dictConfig(self.settings["logging"])
self.configure_scheduler()
self.register_routes()
@staticmethod
def aps_date(date):
if not date:
return
date = datetime.strptime(date, "%d/%m/%Y %H:%M:%S")
return datetime.strftime(date, "%Y-%m-%d %H:%M:%S")
def configure_scheduler(self):
self.scheduler = BackgroundScheduler(self.settings["config"])
self.scheduler.start()
def register_routes(self):
@self.route("/delete_job/<job_id>", methods=["POST"])
def delete_job(job_id):
if self.scheduler.get_job(job_id):
self.scheduler.remove_job(job_id)
return jsonify(True)
@self.route("/next_runtime/<task_id>")
def next_runtime(task_id):
job = self.scheduler.get_job(task_id)
if job and job.next_run_time:
return jsonify(job.next_run_time.strftime("%Y-%m-%d %H:%M:%S"))
return jsonify("Not Scheduled")
@self.route("/schedule", methods=["POST"])
def schedule():
if request.json["mode"] in ("resume", "schedule"):
result = self.schedule_task(request.json["task"])
if not result:
return jsonify({"alert": "Cannot schedule in the past."})
else:
return jsonify({"response": "Task resumed.", "active": True})
else:
try:
self.scheduler.pause_job(request.json["task"]["id"])
return jsonify({"response": "Task paused."})
except JobLookupError:
return jsonify({"alert": "There is no such job scheduled."})
@self.route("/time_left/<task_id>")
def time_left(task_id):
job = self.scheduler.get_job(task_id)
if job and job.next_run_time:
delta = job.next_run_time.replace(tzinfo=None) - datetime.now()
hours, remainder = divmod(delta.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
days = f"{delta.days} days, " if delta.days else ""
return jsonify(f"{days}{hours}h:{minutes}m:{seconds}s")
return jsonify("Not Scheduled")
@staticmethod
def run_service(task_id):
post(
f"{getenv('ENMS_ADDR')}/rest/run_task/{task_id}",
json={},
auth=HTTPBasicAuth(getenv("ENMS_USER"), getenv("ENMS_PASSWORD")),
verify=int(getenv("VERIFY_CERTIFICATE", 1)),
)
def schedule_task(self, task):
if task["scheduling_mode"] == "cron":
crontab = task["crontab_expression"].split()
crontab[-1] = ",".join(self.days[day] for day in crontab[-1].split(","))
trigger = {"trigger": CronTrigger.from_crontab(" ".join(crontab))}
elif task["frequency"]:
trigger = {
"trigger": "interval",
"start_date": self.aps_date(task["start_date"]),
"end_date": self.aps_date(task["end_date"]),
"seconds": int(task["frequency"])
* self.seconds[task["frequency_unit"]],
}
else:
trigger = {"trigger": "date", "run_date": self.aps_date(task["start_date"])}
if not self.scheduler.get_job(task["id"]):
job = self.scheduler.add_job(
id=str(task["id"]),
replace_existing=True,
func=self.run_service,
args=[task["id"]],
**trigger,
)
else:
job = self.scheduler.reschedule_job(str(task["id"]), **trigger)
return job.next_run_time > datetime.now(job.next_run_time.tzinfo)
scheduler = Scheduler()