This repository has been archived by the owner on Aug 9, 2022. It is now read-only.
forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupload_stats_lib.py
151 lines (124 loc) · 4.68 KB
/
upload_stats_lib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import gzip
import io
import json
import os
import zipfile
from pathlib import Path
from typing import Any, Dict, List
import boto3 # type: ignore[import]
import requests
import rockset # type: ignore[import]
PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch"
S3_RESOURCE = boto3.resource("s3")
def _get_request_headers() -> Dict[str, str]:
return {
"Accept": "application/vnd.github.v3+json",
"Authorization": "token " + os.environ["GITHUB_TOKEN"],
}
def _get_artifact_urls(prefix: str, workflow_run_id: int) -> Dict[Path, str]:
"""Get all workflow artifacts with 'test-report' in the name."""
response = requests.get(
f"{PYTORCH_REPO}/actions/runs/{workflow_run_id}/artifacts?per_page=100",
)
artifacts = response.json()["artifacts"]
while "next" in response.links.keys():
response = requests.get(
response.links["next"]["url"], headers=_get_request_headers()
)
artifacts.extend(response.json()["artifacts"])
artifact_urls = {}
for artifact in artifacts:
if artifact["name"].startswith(prefix):
artifact_urls[Path(artifact["name"])] = artifact["archive_download_url"]
return artifact_urls
def _download_artifact(
artifact_name: Path, artifact_url: str, workflow_run_attempt: int
) -> Path:
# [Artifact run attempt]
# All artifacts on a workflow share a single namespace. However, we can
# re-run a workflow and produce a new set of artifacts. To avoid name
# collisions, we add `-runattempt1<run #>-` somewhere in the artifact name.
#
# This code parses out the run attempt number from the artifact name. If it
# doesn't match the one specified on the command line, skip it.
atoms = str(artifact_name).split("-")
for atom in atoms:
if atom.startswith("runattempt"):
found_run_attempt = int(atom[len("runattempt") :])
if workflow_run_attempt != found_run_attempt:
print(
f"Skipping {artifact_name} as it is an invalid run attempt. "
f"Expected {workflow_run_attempt}, found {found_run_attempt}."
)
print(f"Downloading {artifact_name}")
response = requests.get(artifact_url, headers=_get_request_headers())
with open(artifact_name, "wb") as f:
f.write(response.content)
return artifact_name
def download_s3_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
bucket = S3_RESOURCE.Bucket("gha-artifacts")
objs = bucket.objects.filter(
Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}"
)
found_one = False
paths = []
for obj in objs:
found_one = True
p = Path(Path(obj.key).name)
print(f"Downloading {p}")
with open(p, "wb") as f:
f.write(obj.get()["Body"].read())
paths.append(p)
if not found_one:
print(
"::warning title=s3 artifacts not found::"
"Didn't find any test reports in s3, there might be a bug!"
)
return paths
def download_gha_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
artifact_urls = _get_artifact_urls(prefix, workflow_run_id)
paths = []
for name, url in artifact_urls.items():
paths.append(_download_artifact(Path(name), url, workflow_run_attempt))
return paths
def upload_to_rockset(collection: str, docs: List[Any]) -> None:
print(f"Writing {len(docs)} documents to Rockset")
client = rockset.Client(
api_server="api.rs2.usw2.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
)
client.Collection.retrieve(collection).add_docs(docs)
print("Done!")
def upload_to_s3(
workflow_run_id: int,
workflow_run_attempt: int,
collection: str,
docs: List[Dict[str, Any]],
) -> None:
print(f"Writing {len(docs)} documents to S3")
body = io.StringIO()
for doc in docs:
json.dump(doc, body)
body.write("\n")
S3_RESOURCE.Object(
"ossci-raw-job-status",
f"{collection}/{workflow_run_id}/{workflow_run_attempt}",
).put(
Body=gzip.compress(body.getvalue().encode()),
ContentEncoding="gzip",
ContentType="application/json",
)
print("Done!")
def unzip(p: Path) -> None:
"""Unzip the provided zipfile to a similarly-named directory.
Returns None if `p` is not a zipfile.
Looks like: /tmp/test-reports.zip -> /tmp/unzipped-test-reports/
"""
assert p.is_file()
unzipped_dir = p.with_name("unzipped-" + p.stem)
print(f"Extracting {p} to {unzipped_dir}")
with zipfile.ZipFile(p, "r") as zip:
zip.extractall(unzipped_dir)