Skip to content

Commit

Permalink
[forge] make fgi debug easier
Browse files Browse the repository at this point in the history
  • Loading branch information
rustielin authored and aptos-bot committed May 2, 2022
1 parent 9ad68e4 commit b6c34db
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 47 deletions.
28 changes: 22 additions & 6 deletions scripts/fgi/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

# ================ Kube job ================
def create_forge_job(context, user, tag, base_tag, timeout_secs, forge_envs, forge_args):
"""Create the Forge K8s Job template"""
job_name = f"forge-{user}-{int(time.time())}"
job_name = job_name.replace("_", "-") # underscore not allowed in pod name
cluster_name = get_cluster_name_from_context(context)
Expand Down Expand Up @@ -95,14 +96,17 @@ def create_forge_job(context, user, tag, base_tag, timeout_secs, forge_envs, for


def get_cluster_context(cluster_name):
"""Get the Forge cluster context for use with kubectl"""
return f"arn:aws:eks:us-west-2:{AWS_ACCOUNT}:cluster/aptos-{cluster_name}"


def get_cluster_name_from_context(context):
"""Get the Forge cluster name from the context"""
return context.split("/")[1]


def kube_ensure_cluster(clusters):
"""Returns the workspace name of a cluster that is free, otherwise None"""
attempts = 360
for attempt in range(attempts):
for cluster in clusters:
Expand Down Expand Up @@ -140,16 +144,19 @@ def kube_ensure_cluster(clusters):
return None


# randomly select a cluster that is free based on its pod status:
# - no other forge pods currently Running or Pending
# - all monitoring pods are ready
def kube_select_cluster():
"""
randomly select a cluster that is free based on its pod status:
- no other forge pods currently Running or Pending
- all monitoring pods are ready
"""
shuffled_clusters = random.sample(
FORGE_K8S_CLUSTERS, len(FORGE_K8S_CLUSTERS))
return kube_ensure_cluster(shuffled_clusters)


def kube_wait_job(job_name, context):
"""Wait for a K8s Job to be in a healthy state"""
attempts = 360
for _ in range(attempts):
try:
Expand All @@ -173,8 +180,6 @@ def kube_wait_job(job_name, context):
f"kubectl --context='{context}' get pod --selector=job-name={job_name} | grep -i -e ImagePullBackOff -e "
f"InvalidImageName -e ErrImagePull",
shell=True,
# stdout=subprocess.DEVNULL,
# stderr=subprocess.DEVNULL,
)
if ret == 0:
image_name = get_forge_image_name(job_name, context)
Expand All @@ -194,8 +199,8 @@ def kube_wait_job(job_name, context):
return 1


# init the kube context for each available cluster
def kube_init_context(workspace=None):
"""Init the kube context for each available cluster, to ensure we can reach it"""
try:
subprocess.run(
[
Expand Down Expand Up @@ -233,6 +238,7 @@ def kube_init_context(workspace=None):


def get_forge_pods_by_phase(context, phase):
"""Get all Forge pods by phase"""
try:
return json.loads(
subprocess.check_output(
Expand All @@ -254,6 +260,7 @@ def get_forge_pods_by_phase(context, phase):


def get_monitoring_pod(context):
"""Get all monitoring pods"""
return json.loads(
subprocess.check_output(
[
Expand All @@ -271,16 +278,19 @@ def get_monitoring_pod(context):


def get_forge_image_name(job_name, context):
"""Get the image name of the specified Forge job"""
return get_forge_job_jsonpath(
job_name, context, "{.items[0].spec.containers[0].image}"
)


def get_forge_job_phase(job_name, context):
"""Get the current phase of the specified Forge job"""
return get_forge_job_jsonpath(job_name, context, "{.items[0].status.phase}")


def get_forge_job_jsonpath(job_name, context, jsonpath):
"""Get the Forge job spec at the specified jsonpath"""
return subprocess.check_output(
[
"kubectl",
Expand All @@ -296,6 +306,7 @@ def get_forge_job_jsonpath(job_name, context, jsonpath):


def helm_s3_init(workspace):
"""Initializes the S3 bucket used as an internal Helm repo for Forge"""
bucket_url = WORKSPACE_CHART_BUCKETS[workspace]
subprocess.run(
f"helm plugin install https://github.com/hypnoglow/helm-s3.git || true",
Expand All @@ -314,6 +325,7 @@ def helm_s3_init(workspace):


def helm_package_push(chart_path, chart_name, workspace, dir):
"""Packages the helm charts at the given path and pushes it to the internal helm repo on S3"""
subprocess.run(
[
"helm",
Expand All @@ -336,6 +348,10 @@ def helm_package_push(chart_path, chart_name, workspace, dir):


def push_helm_charts(workspace):
"""
Push all helm charts for usage by Forge
Run from aptos-core root directory
"""
helm_s3_init(workspace)
tempdir = tempfile.mkdtemp()
helm_package_push("terraform/testnet/testnet",
Expand Down
104 changes: 63 additions & 41 deletions scripts/fgi/run
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import subprocess
import sys
import tempfile
import time
from datetime import datetime
from datetime import datetime, timezone

from kube import (
kube_init_context,
Expand All @@ -28,8 +28,10 @@ WORKSPACE = ""
REPORT = ""
DEFL_TIMEOUT_SECS = 1800 # Default timeout is 30 mins
USER = getpass.getuser() # Use the current user for naming
OUTPUT_TEE = os.getenv("FGI_OUTPUT_LOG", tempfile.mkstemp()[1])
TIME_FMT = '%Y-%m-%dT%H:%M:%S.000Z'
FORGE_OUTPUT_TEE = os.getenv("FGI_OUTPUT_LOG", tempfile.mkstemp()[1])

OPENSEARCH_TIME_FMT = '%Y-%m-%dT%H:%M:%S.000Z'
DEFL_FORGE_REPORT = "forge_report.json"

HEADER = "\033[95m"
OKBLUE = "\033[94m"
Expand All @@ -38,8 +40,9 @@ WARNING = "\033[93m"
FAIL = "\033[91m"
RESTORE = "\033[0m"

# build the arg parser and return a tuple of (fgi args, forge args)

def build_argparser():
"""Build the arg parser and return a tuple of (fgi args, forge args)"""
parser = argparse.ArgumentParser(
description="Entrypoint for the Forge unified testing framework"
)
Expand All @@ -61,8 +64,10 @@ def build_argparser():
help="Extra environment variables to pass to Forge",
)
build_group = parser.add_mutually_exclusive_group(required=True)
build_group.add_argument("--tag", "-T", help="Image tag to use in kubernetes Forge tests")
build_group.add_argument("--pr", "-p", help="PR to build images from for kubernetes Forge tests")
build_group.add_argument(
"--tag", "-T", help="Image tag to use in kubernetes Forge tests")
build_group.add_argument(
"--pr", "-p", help="PR to build images from for kubernetes Forge tests")
build_group.add_argument(
"--local-swarm",
"-L",
Expand All @@ -77,14 +82,14 @@ def build_argparser():
parser.add_argument(
"--report",
"-R",
default=DEFL_FORGE_REPORT,
help="Dump report to file"
)
return parser.parse_known_args()


def get_grafana_url(cluster_name):
grafana_url_pattern = "http://mon.CLUSTERNAME.aptosdev.com"
return grafana_url_pattern.replace("CLUSTERNAME", cluster_name)
return f"http://o11y.aptosdev.com/grafana/d/overview/overview?orgId=1&var-Datasource=Remote%20Prometheus%20Devinfra&var-chain_name=aptos-{cluster_name}&var-owner=All"


def cli_tool_installed(tool_name):
Expand All @@ -95,7 +100,32 @@ def cli_tool_installed(tool_name):


def get_opensearch_time(timestamp_ms):
return datetime.fromtimestamp(timestamp_ms/1000).strftime(TIME_FMT)
return datetime.fromtimestamp(timestamp_ms/1000, timezone.utc).strftime(OPENSEARCH_TIME_FMT)


def print_streaming_o11y_resources(grafana_url, start_ts_ms):
print("\n**********")
print(
f"{OKBLUE}Auto refresh Dashboard:{RESTORE} {grafana_url}&from={start_ts_ms}&to=now&refresh=10s")
print("**********")


def get_final_o11y_resources(grafana_url, workspace, start_ts_ms, end_ts_ms):
DASHBOARD_LINK = f"{grafana_url}&from={start_ts_ms}&to={end_ts_ms}"
LOGGING_LINK = (
f"https://es.devinfra.aptosdev.com/_dashboards/app/discover#/?"
f"_g=(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:'{get_opensearch_time(start_ts_ms)}',to:'{get_opensearch_time(end_ts_ms)}'))"
f"&_a=(columns:!(_source),filters:!(('$state':(store:appState),"
f"meta:(alias:!n,disabled:!f,index:d0bc5e20-badc-11ec-9a50-89b84ac337af,key:chain_name,negate:!f,params:(query:aptos-{workspace}),type:phrase),"
f"query:(match_phrase:(chain_name:aptos-{workspace})))),"
f"index:d0bc5e20-badc-11ec-9a50-89b84ac337af,interval:auto,query:(language:kuery,query:''),sort:!())"
)

print("\n**********")
print(f"{OKBLUE}Dashboard snapshot:{RESTORE} {DASHBOARD_LINK}")
print(f"{OKBLUE}Logs snapshot:{RESTORE} {LOGGING_LINK}")
print("**********\n")
return DASHBOARD_LINK, LOGGING_LINK


# ================ Parse the args ================
Expand All @@ -115,7 +145,8 @@ if not args.tag:
print(f"{FAIL}Failed to access codebuild. Try aws-mfa?{RESTORE}")
sys.exit(1)
ret = subprocess.call(
["./docker/build-aws.sh", "--build-forge", "--version", f"pull/{args.pr}"]
["./docker/build-aws.sh", "--build-forge",
"--version", f"pull/{args.pr}"]
)
if ret != 0:
print(f"{FAIL}Failed to build forge.")
Expand All @@ -138,7 +169,8 @@ print(f"""

if args.local_swarm:
print("Running Forge on backend: local swarm")
ret = subprocess.call(["cargo", "run", "-p", "forge-cli", "--", "test", "local-swarm"])
ret = subprocess.call(
["cargo", "run", "-p", "forge-cli", "--", "test", "local-swarm"])
sys.exit(ret)

print("Running Forge on backend: kubernetes testnet")
Expand All @@ -164,11 +196,11 @@ if not args.workspace:
else:
ret = kube_ensure_cluster([workspace])
if not ret:
print(f"{FAIL}Failed to acquire specified forge testnet cluster {workspace}{RESTORE}")
print(
f"{FAIL}Failed to acquire specified forge testnet cluster {workspace}{RESTORE}")
sys.exit(1)
context = get_cluster_context(workspace)
print(f"Running experiments on cluster: {workspace}")
grafana_url = get_grafana_url(workspace)
push_helm_charts(workspace)
print()

Expand All @@ -186,7 +218,8 @@ print(f"Specfile: {specfile}")

# ================ Create and run the job ================
print(f"Creating job: {job_name}")
ret = subprocess.call(["kubectl", f"--context={context}", "apply", "-f", specfile])
ret = subprocess.call(
["kubectl", f"--context={context}", "apply", "-f", specfile])
if ret != 0:
print(f"{FAIL}Failed to create forge job{RESTORE}")
sys.exit(1)
Expand All @@ -199,20 +232,18 @@ if ret != 0:
# account for the time delta between querying pod status and finishing waiting
delta_ms = 1000
start_ts_ms = int(time.time() * 1000) - delta_ms
print("\n**********")
print(
f"{OKBLUE}Auto refresh Dashboard:{RESTORE} {grafana_url}/d/overview/overview?from={start_ts_ms}&to=now&refresh"
f"=10s&orgId=1 "
)
print("**********")
grafana_url = get_grafana_url(workspace)

print_streaming_o11y_resources(grafana_url, start_ts_ms)

print("==========begin-pod-logs==========")
print("==========begin-forge-pod-logs==========")
# TODO(rustielin): might have to retry this if kube reschedules it
subprocess.call(
f"kubectl --context={context} logs -f -l job-name={job_name} | tee -a {OUTPUT_TEE}",
f"kubectl --context={context} logs -f -l job-name={job_name} | tee -a {FORGE_OUTPUT_TEE}",
shell=True,
)
print("==========end-pod-logs==========")
print(f"\nLog output: {OUTPUT_TEE}")
print("==========end-forge-pod-logs==========")
print(f"\nForge log output: {FORGE_OUTPUT_TEE}")

try:
job_status = json.loads(
Expand All @@ -234,25 +265,15 @@ except Exception as e:
job_status = {"failed": 1}

end_ts_ms = int(time.time() * 1000)
DASHBOARD_LINK = f"{grafana_url}/d/overview/overview?from={start_ts_ms}&to={end_ts_ms}&orgId=1"
LOGGING_LINK = (
"https://es.devinfra.aptosdev.com/_dashboards/app/discover#/?_g=(filters:!(),"
f"refreshInterval:(pause:!t,value:0),time:(from:'{get_opensearch_time(start_ts_ms)}',to:'{get_opensearch_time(end_ts_ms)}'))&_a=(columns:!(_source),filters:!,"
"(('$state':(store:appState),meta:(alias:!n,disabled:!f,index:ac3499a0-abc8-11ec-9a50-89b84ac337af,"
f"key:chain_name,negate:!f,params:(query:aptos-{workspace}),type:phrase),query:(match_phrase:(chain_name:aptos-{workspace})))),"
"index:ac3499a0-abc8-11ec-9a50-89b84ac337af,interval:auto,query:(language:kuery,query:''),sort:!())"
)

print("\n**********")
print(f"{OKBLUE}Dashboard snapshot:{RESTORE} {DASHBOARD_LINK}")
print(f"{OKBLUE}Logs snapshot:{RESTORE} {LOGGING_LINK}")
print("**********\n")
DASHBOARD_LINK, LOGGING_LINK = get_final_o11y_resources(
grafana_url, workspace, start_ts_ms, end_ts_ms)

test_res = 'failed'
# perf report
test_report = []
read_lines = False
with open(f"{OUTPUT_TEE}", 'r') as file:
with open(f"{FORGE_OUTPUT_TEE}", 'r') as file:
for line in file.readlines():
if 'json-report-end' in line:
read_lines = False
Expand All @@ -263,13 +284,14 @@ with open(f"{OUTPUT_TEE}", 'r') as file:
if 'test result: ok' in line:
test_res = 'passed'
if len(test_report) == 0:
# If Forge emits no report, return a generic error
test_report.append("{\"text\": \"Forge test runner is terminated\"}")
temp = json.loads(''.join(test_report))
temp["logs"] = "Logs: " + LOGGING_LINK
temp["dashboard"] = "Dashboard: " + DASHBOARD_LINK
temp_forge_report = json.loads(''.join(test_report))
temp_forge_report["logs"] = LOGGING_LINK
temp_forge_report["dashboard"] = DASHBOARD_LINK
if args.report:
with open(f"{REPORT}", 'w') as file_object:
file_object.write(json.dumps(temp))
file_object.write(json.dumps(temp_forge_report))


if "failed" in job_status and job_status["failed"] == 1:
Expand Down

0 comments on commit b6c34db

Please sign in to comment.