-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunner.py
130 lines (119 loc) · 4.02 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import utils as utl
import importlib
from os.path import dirname,splitext,basename,join
import state
from datetime import datetime
import graph_utils as gutl
from containers import run
class ArtifactError(Exception):
"""Custom exception for artifact management errors."""
pass
def log_job(stage_name, job_name,start):
stop = datetime.now()
state.pipe.append({
"stage":stage_name,
"job":job_name,
"start":str(start),
"stop":str(stop),
"duration": str(stop-start),
"duration_text": utl.duration_text(stop-start)
})
return
def set_artifact(data,filepath,type="generic"):
id,ext = splitext(basename(filepath))
if(id in state.artifacts):
raise ArtifactError(f"Artifact with ID '{id}' already exists.")
path = dirname(filepath)
state.artifacts[id] = {
"path":path,
"ext":ext,
"type":type,
"filepath":filepath
}
abs_filepath = join("cache",filepath)
if(ext == ".json"):
utl.save_json(data,abs_filepath)
gutl.add_edge({"label":state.job,"class":"job"},{"label":id,"class":"artifact"})
return
def get_artifact(id):
if(id not in state.artifacts):
raise ArtifactError(f"Artifact with ID '{id}' does not exist")
artifact = state.artifacts[id]
gutl.add_edge({"label":id,"class":"artifact"},{"label":state.job,"class":"job"})
if(artifact["ext"] == ".json"):
return utl.load_json(join("cache",artifact["filepath"]))
return None
def set_asset(data,filepath,type="generic"):
filepath = join("assets",filepath)
id,ext = splitext(basename(filepath))
if(id in state.assets):
raise ArtifactError(f"Asset with ID '{id}' already exists.")
path = dirname(filepath)
state.assets[id] = {
"path":path,
"ext":ext,
"type":type,
"filepath":filepath.replace("\\","/")
}
abs_filepath = join("cache",filepath)
if(ext == ".json"):
utl.save_json(data,abs_filepath)
elif(ext == ".dot"):
utl.save_text(data,abs_filepath)
if(type == "graph"):
utl.save_text(gutl.get_dot_graph(data),join("cache",path,f"{id}.dot"))
run.graphviz(f"assets/{id}.dot")
return
def run_stage(stage_name, jobs):
print(f"Running stage: {stage_name}")
state.stage = stage_name
for job_name, job in jobs.items():
module_name, function_name = job.split('#')
state.job = job_name
state.step = function_name
module = importlib.import_module(module_name.replace('.py', ''))
func = getattr(module, function_name)
print(f" Executing job: {job_name}")
start = datetime.now()
func()
log_job(stage_name, job_name,start)
def run_post_build():
set_asset(state.pipe,"pipeline.json")
dep_graph = gutl.get_graph()
set_asset(dep_graph,"dependencies.json",type="graph")
timeline = generate_timeline(state.pipe)
set_asset(timeline,"timeline.json")
state.artifacts.update(state.assets)
set_asset(state.artifacts,"artifacts.json")
return
def run_pipeline(pipeline):
state.value = 1
for stage, jobs in pipeline.items():
run_stage(stage, jobs)
run_post_build()
def generate_timeline(pipe):
items = []
groups = []
groups_names = [] #user to keep entries unique
for item in pipe:
if(item["stage"] not in groups_names):
groups_names.append(item["stage"])
groups.append({
"id":item["stage"],
"content":item["stage"]
})
items.append({
"id":item["job"],
"content":item["job"],
"group":item["stage"],
"start":item["start"],
"end":item["stop"]
})
timeline = {
"items":items,
"groups":groups
}
return timeline
manifest = utl.load_yaml("manifest.yaml")
if __name__ == '__main__':
run_pipeline(manifest["pipeline"])