Skip to content

Commit

Permalink
[ci][postmerge/2] schedule gap builds (ray-project#44576)
Browse files Browse the repository at this point in the history
- Add logic to compute the list of gap commits and schedule one build for each gap
- Implement the function to trigger a blocked build on postmerge
- Also cache the _get_builds function which is used repeatedly and quite expensive

Signed-off-by: can <[email protected]>
  • Loading branch information
can-anyscale authored Apr 16, 2024
1 parent 26af5b7 commit 7028479
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 3 deletions.
65 changes: 63 additions & 2 deletions ci/ray_ci/pipeline/gap_filling_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import subprocess
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any
from typing import List, Dict, Optional, Any, Tuple

from pybuildkite.buildkite import Buildkite


BRANCH = "master"
BLOCK_STEP_KEY = "unblock-me"


class GapFillingScheduler:
Expand All @@ -29,6 +30,66 @@ def __init__(
self.repo_checkout = repo_checkout
self.days_ago = days_ago

def run(self) -> Dict[str, Optional[str]]:
"""
Create gap filling builds for the latest failing build. Return a mapping of
commit to the triggered build number.
"""
commits = self.get_gap_commits()

return {commit: self._trigger_build(commit) for commit in commits}

def get_gap_commits(self) -> List[str]:
"""
Return the list of commits between the latest passing and failing builds.
"""
failing_revision = self._get_latest_commit_for_build_state("failed")
passing_revision = self._get_latest_commit_for_build_state("passed")
return (
subprocess.check_output(
[
"git",
"rev-list",
"--reverse",
f"^{passing_revision}",
f"{failing_revision}~",
],
cwd=self.repo_checkout,
)
.decode("utf-8")
.strip()
.split("\n")
)

def _find_blocked_build_and_job(
self, commit: str
) -> Tuple[Optional[str], Optional[str]]:
for build in self._get_builds():
if build["commit"] != commit:
continue
if build["state"] != "blocked":
continue
for job in build["jobs"]:
if job.get("step_key") != BLOCK_STEP_KEY:
continue

return build["number"], job["id"]

return None, None

def _trigger_build(self, commit: str) -> Optional[str]:
build, job = self._find_blocked_build_and_job(commit)
if not build or not job:
return None

self.buildkite.jobs().unblock_job(
self.buildkite_organization,
self.buildkite_pipeline,
build,
job,
)
return build

def _get_latest_commit_for_build_state(self, build_state: str) -> Optional[str]:
latest_commits = self._get_latest_commits()
commit_to_index = {commit: index for index, commit in enumerate(latest_commits)}
Expand All @@ -48,7 +109,7 @@ def _get_latest_commits(self) -> List[str]:
[
"git",
"log",
"--pretty=tformat:'%H'",
"--pretty=tformat:%H",
f"--since={self.days_ago}.days",
],
cwd=self.repo_checkout,
Expand Down
97 changes: 96 additions & 1 deletion ci/ray_ci/pipeline/test_gap_filling_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from ci.ray_ci.pipeline.gap_filling_scheduler import GapFillingScheduler
from ci.ray_ci.pipeline.gap_filling_scheduler import GapFillingScheduler, BLOCK_STEP_KEY


@mock.patch(
Expand Down Expand Up @@ -37,5 +37,100 @@ def test_get_latest_commit_for_build_state(mock_get_builds, mock_get_latest_comm
assert scheduler._get_latest_commit_for_build_state("something") is None


@mock.patch("subprocess.check_output")
@mock.patch(
"ci.ray_ci.pipeline.gap_filling_scheduler.GapFillingScheduler._get_latest_commits"
)
@mock.patch("ci.ray_ci.pipeline.gap_filling_scheduler.GapFillingScheduler._get_builds")
def test_get_gap_commits(mock_get_builds, mock_get_latest_commits, mock_check_output):
def _mock_check_output_side_effect(cmd: str, cwd: str) -> str:
assert " ".join(cmd) == "git rev-list --reverse ^111 444~"
return b"222\n333\n"

mock_get_builds.return_value = [
{
"state": "passed",
"commit": "111",
},
{
"state": "failed",
"commit": "444",
},
]
mock_get_latest_commits.return_value = [
"444",
"111",
]
mock_check_output.side_effect = _mock_check_output_side_effect
scheduler = GapFillingScheduler("org", "pipeline", "token", "/ray")
assert scheduler.get_gap_commits() == ["222", "333"]


@mock.patch(
"ci.ray_ci.pipeline.gap_filling_scheduler.GapFillingScheduler._trigger_build"
)
@mock.patch(
"ci.ray_ci.pipeline.gap_filling_scheduler.GapFillingScheduler.get_gap_commits"
)
def test_run(mock_get_gap_commits, mock_trigger_build):
scheduler = GapFillingScheduler("org", "pipeline", "token", "/ray")

# no builds are triggered
mock_get_gap_commits.return_value = []
scheduler.run()
mock_trigger_build.assert_not_called()

# builds are triggered
mock_get_gap_commits.return_value = ["222", "333"]
scheduler.run()
mock_trigger_build.assert_has_calls(
[
mock.call("222"),
mock.call("333"),
],
)


@mock.patch("ci.ray_ci.pipeline.gap_filling_scheduler.GapFillingScheduler._get_builds")
def test_find_blocked_build_and_job(mock_get_builds):
scheduler = GapFillingScheduler("org", "pipeline", "token", "/ray")

mock_get_builds.return_value = [
# build 100 is blocked on job 3
{
"state": "blocked",
"number": "100",
"commit": "hi",
"jobs": [
{"id": "1"},
{"id": "2", "step_key": "not_block"},
{"id": "3", "step_key": BLOCK_STEP_KEY},
],
},
# step is blocked but build is not blocked
{
"state": "passed",
"number": "200",
"commit": "w00t",
"jobs": [
{"id": "1"},
{"id": "3", "step_key": BLOCK_STEP_KEY},
],
},
# build is blocked but step is not blocked
{
"state": "blocked",
"number": "200",
"commit": "bar",
"jobs": [
{"id": "1"},
],
},
]
scheduler._find_blocked_build_and_job("hi") == (100, 3)
scheduler._find_blocked_build_and_job("w00t") == (None, None)
scheduler._find_blocked_build_and_job("bar") == (None, None)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

0 comments on commit 7028479

Please sign in to comment.