-
Notifications
You must be signed in to change notification settings - Fork 157
/
Copy pathmanifest.py
256 lines (234 loc) · 10.6 KB
/
manifest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""Provide dump of software and libraries installed on CloudBioLinux image.
This provides an output YAML file with package details, providing a complete
dump of installed software and packages. The YAML output feeds into a BioGems
style webpage that provides a more human friendly view of installed packages.
The version information provides a reproducible dump of software on a system.
"""
import os
import collections
import json
import inspect
import subprocess
import sys
from six.moves import urllib
import yaml
try:
import yolk.yolklib
import yolk.metadata
except ImportError:
yolk = None
def create(out_dir, tooldir="/usr/local", fetch_remote=False):
"""Create a manifest in the output directory with installed packages.
"""
if not os.path.exists(out_dir):
os.makedirs(out_dir)
#write_debian_pkg_info(out_dir, fetch_remote)
write_python_pkg_info(out_dir)
write_r_pkg_info(out_dir)
#write_brew_pkg_info(out_dir, tooldir)
#write_custom_pkg_info(out_dir, tooldir)
# ## Custom packages
def _get_custom_pkg_info(name, fn):
"""Retrieve information about the installed package from the install function.
"""
vals = dict((k, v) for k, v in inspect.getmembers(fn))
code = inspect.getsourcelines(fn)
if vals["__name__"] == "decorator":
fn = [x for x in fn.func_closure if not isinstance(x.cell_contents, str)][0].cell_contents
vals = dict((k, v) for k, v in inspect.getmembers(fn))
code = inspect.getsourcelines(fn)
version = ""
for line in (l.strip() for l in code[0]):
if line.find("version") >= 0 and line.find(" =") > 0:
version = line.split()[-1].replace('"', '').replace("'", "")
if version:
break
doc = vals.get("func_doc", "")
descr, homepage = "", ""
if doc is not None:
descr = doc.split("\n")[0]
for line in doc.split("\n"):
if line.strip().startswith("http"):
homepage = line.strip()
return {"name": name.replace("install_", ""),
"description": descr,
"homepage_uri": homepage,
"version": version}
def write_custom_pkg_info(out_dir, tooldir):
custom_names = ["bio_general", "bio_nextgen", "cloudman", "distributed",
"java", "python", "phylogeny", "system"]
out_file = os.path.join(out_dir, "custom-packages.yaml")
if not os.path.exists(out_file):
out = {}
for modname in custom_names:
try:
mod = getattr(__import__("cloudbio.custom", globals(), locals(),
[modname], -1),
modname)
except ImportError as msg:
# Skip fabric import errors as we transition away from it
if "fabric" in str(msg):
mod = None
else:
raise
except ValueError:
mod = None
if mod:
for prog in [x for x in dir(mod) if x.startswith("install")]:
pkg = _get_custom_pkg_info(prog, getattr(mod, prog))
out[pkg["name"]] = pkg
with open(out_file, "w") as out_handle:
yaml.safe_dump(out, out_handle, default_flow_style=False, allow_unicode=False)
return out_file
# ## Homebrew/Linuxbrew packages
def write_brew_pkg_info(out_dir, tooldir):
"""Extract information for packages installed by homebrew/linuxbrew.
"""
out_file = os.path.join(out_dir, "brew-packages.yaml")
if not os.path.exists(out_file):
brew_cmd = os.path.join(tooldir, "bin", "brew") if tooldir else None
if not brew_cmd or not os.path.exists(brew_cmd):
brew_cmd = "brew"
try:
vout = subprocess.check_output([brew_cmd, "list", "--versions"]).decode()
except (OSError, subprocess.CalledProcessError): # brew not installed/used
vout = ""
out = {}
for vstr in vout.split("\n"):
if vstr.strip():
parts = vstr.rstrip().split()
name = parts[0]
v = parts[-1]
# remove any revisions from the version
v = v.rsplit("_", 1)[0]
out[name] = {"name": name, "version": v}
with open(out_file, "w") as out_handle:
yaml.safe_dump(out, out_handle, default_flow_style=False, allow_unicode=False)
return out_file
# ## R packages
def get_r_pkg_info():
r_command = ("options(width=10000); subset(installed.packages(fields=c('Title', 'URL')), "
"select=c('Version', 'Title','URL'))")
try:
out = subprocess.check_output(["Rscript", "-e", r_command]).decode()
except (subprocess.CalledProcessError, OSError):
out = ""
pkg_raw_list = []
for line in out.split("\n")[1:]:
pkg_raw_list.append(list(filter(None, [entry.strip(' ') for entry in line.split('"')])))
for pkg in pkg_raw_list:
if len(pkg) > 2:
yield {"name": pkg[0], "version": pkg[1],
"description": pkg[2],
"homepage_uri": (pkg[3], '')[pkg[3] == 'NA'] if len(pkg) > 3 else ""}
def write_r_pkg_info(out_dir):
out_file = os.path.join(out_dir, "r-packages.yaml")
if not os.path.exists(out_file):
out = {}
for pkg in get_r_pkg_info():
out[pkg["name"]] = pkg
with open(out_file, "w") as out_handle:
yaml.safe_dump(out, out_handle, default_flow_style=False, allow_unicode=False)
return out_file
# ## Python packages
def _get_conda_envs(conda_bin):
info = json.loads(subprocess.check_output("{conda_bin} info --envs --json".format(**locals()), shell=True))
prefix = info["conda_prefix"] + "/envs/"
return [e.replace(prefix, "") for e in info["envs"] if e.startswith(prefix)]
def get_python_pkg_info():
if yolk:
for dist in yolk.yolklib.Distributions().get_packages("all"):
md = yolk.metadata.get_metadata(dist)
yield {"name": md["Name"].lower(), "version": md["Version"],
"description": md.get("Summary", ""),
"homepage_uri": md.get("Home-page", "")}
else:
base_dir = os.path.dirname(os.path.realpath(sys.executable))
conda_bin = os.path.join(base_dir, "conda")
if os.path.exists(conda_bin):
for line in subprocess.check_output([conda_bin, "list"]).decode().split("\n"):
if line.strip() and not line.startswith("#"):
name, version = line.split()[:2]
yield {"name": name.lower(), "version": version}
for env in _get_conda_envs(conda_bin):
for line in subprocess.check_output([conda_bin, "list", "-n", env]).decode().split("\n"):
if line.strip() and not line.startswith("#"):
name, version = line.split()[:2]
yield {"name": name.lower(), "version": version}
else:
for line in subprocess.check_output([os.path.join(base_dir, "pip"), "list"]).decode().split("\n"):
if line.strip() and not line.startswith("#"):
name, version = line.split()[:2]
yield {"name": name.lower(), "version": version[1:-1]}
def _resolve_latest_pkg(pkgs):
if len(pkgs) == 1 or not yolk:
return pkgs[0]
else:
latest_version = yolk.yolklib.Distributions().get_highest_installed(pkgs[0]["name"])
return [x for x in pkgs if x["version"] == latest_version][0]
def write_python_pkg_info(out_dir):
out_file = os.path.join(out_dir, "python-packages.yaml")
if not os.path.exists(out_file):
pkgs_by_name = collections.defaultdict(list)
for pkg in get_python_pkg_info():
pkgs_by_name[pkg["name"]].append(pkg)
out = {}
for name in sorted(pkgs_by_name.keys()):
out[name] = _resolve_latest_pkg(pkgs_by_name[name])
with open(out_file, "w") as out_handle:
yaml.safe_dump(out, out_handle, default_flow_style=False, allow_unicode=False)
return out_file
# ## Debian packages
def _get_pkg_popcon():
"""Retrieve popularity information for debian packages.
"""
url = "http://popcon.debian.org/by_vote"
popcon = {}
for line in (l for l in urllib.request.urlopen(url) if not l.startswith(("#", "--"))):
parts = line.split()
popcon[parts[1]] = int(parts[3])
return popcon
def get_debian_pkg_info(fetch_remote=False):
pkg_popcon = _get_pkg_popcon() if fetch_remote else {}
cmd = ("dpkg-query --show --showformat "
"'${Status}\t${Package}\t${Version}\t${Section}\t${Homepage}\t${binary:Summary}\n'")
for pkg_line in [l for l in subprocess.check_output(cmd, shell=True).decode(errors="replace").split("\n")
if l.startswith("install ok")]:
parts = pkg_line.rstrip("\n").split("\t")
if len(parts) > 5:
pkg = {"name": parts[1], "version": parts[2],
"section": parts[3], "homepage_uri": parts[4],
"description": parts[5]}
if pkg_popcon.get(pkg["name"]):
pkg["downloads"] = pkg_popcon.get(pkg["name"], 0)
yield pkg
def write_debian_pkg_info(out_dir, fetch_remote=False):
base_sections = set(["gnome", "admin", "utils", "web", "games",
"sound", "devel", "kde", "x11", "net", "text",
"graphics", "misc", "editors", "fonts", "doc",
"mail", "otherosfs", "video", "kernel",
"libs", "libdevel", "comm", "metapackages", "tex",
"introspection"])
for s in list(base_sections):
base_sections.add("universe/%s" % s)
base_sections.add("partner/%s" % s)
out_file = os.path.join(out_dir, "debian-packages.yaml")
out_base_file = os.path.join(out_dir, "debian-base-packages.yaml")
try:
subprocess.check_call(["dpkg", "--help"], stdout=subprocess.PIPE)
has_dpkg = True
except (subprocess.CalledProcessError, OSError):
has_dpkg = False
if has_dpkg and (not os.path.exists(out_file) or not os.path.exists(out_base_file)):
out = {}
out_base = {}
for pkg in get_debian_pkg_info(fetch_remote):
if pkg.get("section") in base_sections:
out_base[pkg["name"]] = pkg
else:
out[pkg["name"]] = pkg
with open(out_file, "w") as out_handle:
yaml.safe_dump(out, out_handle, default_flow_style=False, allow_unicode=False)
with open(out_base_file, "w") as out_handle:
yaml.safe_dump(out_base, out_handle, default_flow_style=False, allow_unicode=False)
return out_file