Skip to content

Commit

Permalink
[SPARK-3398] [SPARK-4325] [EC2] Use EC2 status checks.
Browse files Browse the repository at this point in the history
This PR re-introduces [0e648bc](apache@0e648bc) from PR apache#2339, which somehow never made it into the codebase.

Additionally, it removes a now-unnecessary linear backoff on the SSH checks since we are blocking on EC2 status checks before testing SSH.

Author: Nicholas Chammas <[email protected]>

Closes apache#3195 from nchammas/remove-ec2-ssh-backoff and squashes the following commits:

efb29e1 [Nicholas Chammas] Revert "Remove linear backoff."
ef3ca99 [Nicholas Chammas] reuse conn
adb4eaa [Nicholas Chammas] Remove linear backoff.
55caa24 [Nicholas Chammas] Check EC2 status checks before SSH.
  • Loading branch information
nchammas authored and JoshRosen committed Nov 29, 2014
1 parent 047ff57 commit 317e114
Showing 1 changed file with 36 additions and 12 deletions.
48 changes: 36 additions & 12 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import time
import urllib2
import warnings
from datetime import datetime
from optparse import OptionParser
from sys import stderr
import boto
Expand Down Expand Up @@ -589,7 +590,9 @@ def setup_spark_cluster(master, opts):


def is_ssh_available(host, opts):
"Checks if SSH is available on the host."
"""
Check if SSH is available on a host.
"""
try:
with open(os.devnull, 'w') as devnull:
ret = subprocess.check_call(
Expand All @@ -604,36 +607,48 @@ def is_ssh_available(host, opts):


def is_cluster_ssh_available(cluster_instances, opts):
"""
Check if SSH is available on all the instances in a cluster.
"""
for i in cluster_instances:
if not is_ssh_available(host=i.ip_address, opts=opts):
return False
else:
return True


def wait_for_cluster_state(cluster_instances, cluster_state, opts):
def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
"""
Wait for all the instances in the cluster to reach a designated state.
cluster_instances: a list of boto.ec2.instance.Instance
cluster_state: a string representing the desired state of all the instances in the cluster
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
'running', 'terminated', etc.
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
"""
sys.stdout.write(
"Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
"Waiting for cluster to enter '{s}' state.".format(s=cluster_state)
)
sys.stdout.flush()

start_time = datetime.now()

num_attempts = 0
conn = ec2.connect_to_region(opts.region)

while True:
time.sleep(3 * num_attempts)
time.sleep(5 * num_attempts) # seconds

for i in cluster_instances:
s = i.update() # capture output to suppress print to screen in newer versions of boto
i.update()

statuses = conn.get_all_instance_status(instance_ids=[i.id for i in cluster_instances])

if cluster_state == 'ssh-ready':
if all(i.state == 'running' for i in cluster_instances) and \
all(s.system_status.status == 'ok' for s in statuses) and \
all(s.instance_status.status == 'ok' for s in statuses) and \
is_cluster_ssh_available(cluster_instances, opts):
break
else:
Expand All @@ -647,6 +662,12 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts):

sys.stdout.write("\n")

end_time = datetime.now()
print "Cluster is now in '{s}' state. Waited {t} seconds.".format(
s=cluster_state,
t=(end_time - start_time).seconds
)


# Get number of local disks available for a given EC2 instance type.
def get_num_disks(instance_type):
Expand Down Expand Up @@ -895,7 +916,7 @@ def real_main():
# See: https://docs.python.org/3.5/whatsnew/2.7.html
warnings.warn(
"This option is deprecated and has no effect. "
"spark-ec2 automatically waits as long as necessary for clusters to startup.",
"spark-ec2 automatically waits as long as necessary for clusters to start up.",
DeprecationWarning
)

Expand All @@ -922,9 +943,10 @@ def real_main():
else:
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
wait_for_cluster_state(
conn=conn,
opts=opts,
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
cluster_state='ssh-ready'
)
setup_cluster(conn, master_nodes, slave_nodes, opts, True)

Expand All @@ -951,9 +973,10 @@ def real_main():
print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves"]
wait_for_cluster_state(
conn=conn,
opts=opts,
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated',
opts=opts
cluster_state='terminated'
)
attempt = 1
while attempt <= 3:
Expand Down Expand Up @@ -1055,9 +1078,10 @@ def real_main():
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
wait_for_cluster_state(
conn=conn,
opts=opts,
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
cluster_state='ssh-ready'
)
setup_cluster(conn, master_nodes, slave_nodes, opts, False)

Expand Down

0 comments on commit 317e114

Please sign in to comment.