Skip to content

Commit

Permalink
Merge pull request grpc#22661 from ericgribkoff/backport_retries
Browse files Browse the repository at this point in the history
Merge pull request grpc#22659 from ericgribkoff/gcp_retries
  • Loading branch information
ericgribkoff authored Apr 14, 2020
2 parents bdbdd3f + ca67386 commit 0dc94dc
Showing 1 changed file with 75 additions and 52 deletions.
127 changes: 75 additions & 52 deletions tools/run_tests/run_xds_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def parse_port_range(port_arg):
_NUM_TEST_RPCS = 10 * args.qps
_WAIT_FOR_STATS_SEC = 180
_WAIT_FOR_URL_MAP_PATCH_SEC = 300
_GCP_API_RETRIES = 5
_BOOTSTRAP_TEMPLATE = """
{{
"node": {{
Expand Down Expand Up @@ -549,8 +550,8 @@ def create_instance_template(gcp, name, network, source_image, machine_type,
}

logger.debug('Sending GCP request with body=%s', config)
result = gcp.compute.instanceTemplates().insert(project=gcp.project,
body=config).execute()
result = gcp.compute.instanceTemplates().insert(
project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
gcp.instance_template = GcpResource(config['name'], result['targetLink'])

Expand All @@ -567,13 +568,14 @@ def add_instance_group(gcp, zone, name, size):
}

logger.debug('Sending GCP request with body=%s', config)
result = gcp.compute.instanceGroupManagers().insert(project=gcp.project,
zone=zone,
body=config).execute()
result = gcp.compute.instanceGroupManagers().insert(
project=gcp.project, zone=zone,
body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_zone_operation(gcp, zone, result['name'])
result = gcp.compute.instanceGroupManagers().get(
project=gcp.project, zone=zone,
instanceGroupManager=config['name']).execute()
instanceGroupManager=config['name']).execute(
num_retries=_GCP_API_RETRIES)
instance_group = InstanceGroup(config['name'], result['instanceGroup'],
zone)
gcp.instance_groups.append(instance_group)
Expand All @@ -600,8 +602,8 @@ def create_health_check(gcp, name):
}
compute_to_use = gcp.compute
logger.debug('Sending GCP request with body=%s', config)
result = compute_to_use.healthChecks().insert(project=gcp.project,
body=config).execute()
result = compute_to_use.healthChecks().insert(
project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
gcp.health_check = GcpResource(config['name'], result['targetLink'])

Expand All @@ -617,8 +619,8 @@ def create_health_check_firewall_rule(gcp, name):
'targetTags': ['allow-health-checks'],
}
logger.debug('Sending GCP request with body=%s', config)
result = gcp.compute.firewalls().insert(project=gcp.project,
body=config).execute()
result = gcp.compute.firewalls().insert(
project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
gcp.health_check_firewall_rule = GcpResource(config['name'],
result['targetLink'])
Expand All @@ -639,8 +641,8 @@ def add_backend_service(gcp, name):
'protocol': protocol
}
logger.debug('Sending GCP request with body=%s', config)
result = compute_to_use.backendServices().insert(project=gcp.project,
body=config).execute()
result = compute_to_use.backendServices().insert(
project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
backend_service = GcpResource(config['name'], result['targetLink'])
gcp.backend_services.append(backend_service)
Expand All @@ -661,8 +663,8 @@ def create_url_map(gcp, name, backend_service, host_name):
}]
}
logger.debug('Sending GCP request with body=%s', config)
result = gcp.compute.urlMaps().insert(project=gcp.project,
body=config).execute()
result = gcp.compute.urlMaps().insert(
project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
gcp.url_map = GcpResource(config['name'], result['targetLink'])

Expand All @@ -675,9 +677,9 @@ def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
}]
}
logger.debug('Sending GCP request with body=%s', config)
result = gcp.compute.urlMaps().patch(project=gcp.project,
urlMap=name,
body=config).execute()
result = gcp.compute.urlMaps().patch(
project=gcp.project, urlMap=name,
body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])


Expand All @@ -690,15 +692,17 @@ def create_target_proxy(gcp, name):
}
logger.debug('Sending GCP request with body=%s', config)
result = gcp.alpha_compute.targetGrpcProxies().insert(
project=gcp.project, body=config).execute()
project=gcp.project,
body=config).execute(num_retries=_GCP_API_RETRIES)
else:
config = {
'name': name,
'url_map': gcp.url_map.url,
}
logger.debug('Sending GCP request with body=%s', config)
result = gcp.compute.targetHttpProxies().insert(project=gcp.project,
body=config).execute()
result = gcp.compute.targetHttpProxies().insert(
project=gcp.project,
body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
gcp.target_proxy = GcpResource(config['name'], result['targetLink'])

Expand All @@ -720,7 +724,8 @@ def create_global_forwarding_rule(gcp, name, potential_ports):
}
logger.debug('Sending GCP request with body=%s', config)
result = compute_to_use.globalForwardingRules().insert(
project=gcp.project, body=config).execute()
project=gcp.project,
body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
gcp.global_forwarding_rule = GcpResource(config['name'],
result['targetLink'])
Expand All @@ -736,7 +741,8 @@ def delete_global_forwarding_rule(gcp):
try:
result = gcp.compute.globalForwardingRules().delete(
project=gcp.project,
forwardingRule=gcp.global_forwarding_rule.name).execute()
forwardingRule=gcp.global_forwarding_rule.name).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
Expand All @@ -747,11 +753,13 @@ def delete_target_proxy(gcp):
if gcp.alpha_compute:
result = gcp.alpha_compute.targetGrpcProxies().delete(
project=gcp.project,
targetGrpcProxy=gcp.target_proxy.name).execute()
targetGrpcProxy=gcp.target_proxy.name).execute(
num_retries=_GCP_API_RETRIES)
else:
result = gcp.compute.targetHttpProxies().delete(
project=gcp.project,
targetHttpProxy=gcp.target_proxy.name).execute()
targetHttpProxy=gcp.target_proxy.name).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
Expand All @@ -760,7 +768,8 @@ def delete_target_proxy(gcp):
def delete_url_map(gcp):
try:
result = gcp.compute.urlMaps().delete(
project=gcp.project, urlMap=gcp.url_map.name).execute()
project=gcp.project,
urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
Expand All @@ -771,7 +780,8 @@ def delete_backend_services(gcp):
try:
result = gcp.compute.backendServices().delete(
project=gcp.project,
backendService=backend_service.name).execute()
backendService=backend_service.name).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
Expand All @@ -781,7 +791,8 @@ def delete_firewall(gcp):
try:
result = gcp.compute.firewalls().delete(
project=gcp.project,
firewall=gcp.health_check_firewall_rule.name).execute()
firewall=gcp.health_check_firewall_rule.name).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
Expand All @@ -790,7 +801,8 @@ def delete_firewall(gcp):
def delete_health_check(gcp):
try:
result = gcp.compute.healthChecks().delete(
project=gcp.project, healthCheck=gcp.health_check.name).execute()
project=gcp.project, healthCheck=gcp.health_check.name).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
Expand All @@ -802,7 +814,8 @@ def delete_instance_groups(gcp):
result = gcp.compute.instanceGroupManagers().delete(
project=gcp.project,
zone=instance_group.zone,
instanceGroupManager=instance_group.name).execute()
instanceGroupManager=instance_group.name).execute(
num_retries=_GCP_API_RETRIES)
wait_for_zone_operation(gcp,
instance_group.zone,
result['name'],
Expand All @@ -815,7 +828,8 @@ def delete_instance_template(gcp):
try:
result = gcp.compute.instanceTemplates().delete(
project=gcp.project,
instanceTemplate=gcp.instance_template.name).execute()
instanceTemplate=gcp.instance_template.name).execute(
num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])
except googleapiclient.errors.HttpError as http_error:
logger.info('Delete failed: %s', http_error)
Expand All @@ -839,7 +853,7 @@ def patch_backend_instances(gcp,
logger.debug('Sending GCP request with body=%s', config)
result = compute_to_use.backendServices().patch(
project=gcp.project, backendService=backend_service.name,
body=config).execute()
body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp,
result['name'],
timeout_sec=_WAIT_FOR_BACKEND_SEC)
Expand All @@ -853,7 +867,7 @@ def resize_instance_group(gcp,
project=gcp.project,
zone=instance_group.zone,
instanceGroupManager=instance_group.name,
size=new_size).execute()
size=new_size).execute(num_retries=_GCP_API_RETRIES)
wait_for_zone_operation(gcp,
instance_group.zone,
result['name'],
Expand All @@ -865,7 +879,7 @@ def resize_instance_group(gcp,
break
if time.time() - start_time > timeout_sec:
raise Exception('Failed to resize primary instance group')
time.sleep(1)
time.sleep(2)


def patch_url_map_backend_service(gcp, backend_service):
Expand All @@ -878,9 +892,9 @@ def patch_url_map_backend_service(gcp, backend_service):
}]
}
logger.debug('Sending GCP request with body=%s', config)
result = gcp.compute.urlMaps().patch(project=gcp.project,
urlMap=gcp.url_map.name,
body=config).execute()
result = gcp.compute.urlMaps().patch(
project=gcp.project, urlMap=gcp.url_map.name,
body=config).execute(num_retries=_GCP_API_RETRIES)
wait_for_global_operation(gcp, result['name'])


Expand All @@ -890,12 +904,13 @@ def wait_for_global_operation(gcp,
start_time = time.time()
while time.time() - start_time <= timeout_sec:
result = gcp.compute.globalOperations().get(
project=gcp.project, operation=operation).execute()
project=gcp.project,
operation=operation).execute(num_retries=_GCP_API_RETRIES)
if result['status'] == 'DONE':
if 'error' in result:
raise Exception(result['error'])
return
time.sleep(1)
time.sleep(2)
raise Exception('Operation %s did not complete within %d', operation,
timeout_sec)

Expand All @@ -907,12 +922,13 @@ def wait_for_zone_operation(gcp,
start_time = time.time()
while time.time() - start_time <= timeout_sec:
result = gcp.compute.zoneOperations().get(
project=gcp.project, zone=zone, operation=operation).execute()
project=gcp.project, zone=zone,
operation=operation).execute(num_retries=_GCP_API_RETRIES)
if result['status'] == 'DONE':
if 'error' in result:
raise Exception(result['error'])
return
time.sleep(1)
time.sleep(2)
raise Exception('Operation %s did not complete within %d', operation,
timeout_sec)

Expand All @@ -927,7 +943,7 @@ def wait_for_healthy_backends(gcp,
result = gcp.compute.backendServices().getHealth(
project=gcp.project,
backendService=backend_service.name,
body=config).execute()
body=config).execute(num_retries=_GCP_API_RETRIES)
if 'healthStatus' in result:
healthy = True
for instance in result['healthStatus']:
Expand All @@ -936,7 +952,7 @@ def wait_for_healthy_backends(gcp,
break
if healthy:
return
time.sleep(1)
time.sleep(2)
raise Exception('Not all backends became healthy within %d seconds: %s' %
(timeout_sec, result))

Expand All @@ -949,7 +965,7 @@ def get_instance_names(gcp, instance_group):
instanceGroup=instance_group.name,
body={
'instanceState': 'ALL'
}).execute()
}).execute(num_retries=_GCP_API_RETRIES)
if 'items' not in result:
return []
for item in result['items']:
Expand Down Expand Up @@ -1081,34 +1097,39 @@ def __init__(self, compute, alpha_compute, project):
if not gcp.instance_template:
result = compute.instanceTemplates().get(
project=args.project_id,
instanceTemplate=template_name).execute()
instanceTemplate=template_name).execute(
num_retries=_GCP_API_RETRIES)
gcp.instance_template = GcpResource(template_name,
result['selfLink'])
if not gcp.backend_services:
result = compute.backendServices().get(
project=args.project_id,
backendService=backend_service_name).execute()
backendService=backend_service_name).execute(
num_retries=_GCP_API_RETRIES)
backend_service = GcpResource(backend_service_name,
result['selfLink'])
gcp.backend_services.append(backend_service)
result = compute.backendServices().get(
project=args.project_id,
backendService=alternate_backend_service_name).execute()
backendService=alternate_backend_service_name).execute(
num_retries=_GCP_API_RETRIES)
alternate_backend_service = GcpResource(
alternate_backend_service_name, result['selfLink'])
gcp.backend_services.append(alternate_backend_service)
if not gcp.instance_groups:
result = compute.instanceGroups().get(
project=args.project_id,
zone=args.zone,
instanceGroup=instance_group_name).execute()
instanceGroup=instance_group_name).execute(
num_retries=_GCP_API_RETRIES)
instance_group = InstanceGroup(instance_group_name,
result['selfLink'], args.zone)
gcp.instance_groups.append(instance_group)
result = compute.instanceGroups().get(
project=args.project_id,
zone=args.zone,
instanceGroup=same_zone_instance_group_name).execute()
instanceGroup=same_zone_instance_group_name).execute(
num_retries=_GCP_API_RETRIES)
same_zone_instance_group = InstanceGroup(
same_zone_instance_group_name, result['selfLink'],
args.zone)
Expand All @@ -1118,20 +1139,22 @@ def __init__(self, compute, alpha_compute, project):
project=args.project_id,
zone=args.secondary_zone,
instanceGroup=secondary_zone_instance_group_name
).execute()
).execute(num_retries=_GCP_API_RETRIES)
secondary_zone_instance_group = InstanceGroup(
secondary_zone_instance_group_name, result['selfLink'],
args.secondary_zone)
gcp.instance_groups.append(secondary_zone_instance_group)
if not gcp.health_check:
result = compute.healthChecks().get(
project=args.project_id,
healthCheck=health_check_name).execute()
healthCheck=health_check_name).execute(
num_retries=_GCP_API_RETRIES)
gcp.health_check = GcpResource(health_check_name,
result['selfLink'])
if not gcp.url_map:
result = compute.urlMaps().get(project=args.project_id,
urlMap=url_map_name).execute()
result = compute.urlMaps().get(
project=args.project_id,
urlMap=url_map_name).execute(num_retries=_GCP_API_RETRIES)
gcp.url_map = GcpResource(url_map_name, result['selfLink'])
if not gcp.service_port:
gcp.service_port = args.service_port_range[0]
Expand Down

0 comments on commit 0dc94dc

Please sign in to comment.