Skip to content

Commit

Permalink
Enable stopping and starting a spot cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Nov 12, 2013
1 parent 23b53ef commit bc9f7ea
Showing 1 changed file with 32 additions and 19 deletions.
51 changes: 32 additions & 19 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def parse_args():
help="The SSH user you want to connect as (default: root)")
parser.add_option("--delete-groups", action="store_true", default=False,
help="When destroying a cluster, delete the security groups that were created")
parser.add_option("--use-existing-master", action="store_true", default=False,
help="Launch fresh slaves, but use an existing stopped master if possible")

(opts, args) = parser.parse_args()
if len(args) != 2:
Expand Down Expand Up @@ -233,9 +235,9 @@ def launch_cluster(conn, opts, cluster_name):
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')

# Check if instances are already running in our groups
active_nodes = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if any(active_nodes):
existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if existing_slaves or (existing_masters and not opts.use_existing_master):
print >> stderr, ("ERROR: There are already instances running in " +
"group %s or %s" % (master_group.name, slave_group.name))
sys.exit(1)
Expand Down Expand Up @@ -336,21 +338,28 @@ def launch_cluster(conn, opts, cluster_name):
zone, slave_res.id)
i += 1

# Launch masters
master_type = opts.master_instance_type
if master_type == "":
master_type = opts.instance_type
if opts.zone == 'all':
opts.zone = random.choice(conn.get_all_zones()).name
master_res = image.run(key_name = opts.key_pair,
security_groups = [master_group],
instance_type = master_type,
placement = opts.zone,
min_count = 1,
max_count = 1,
block_device_map = block_map)
master_nodes = master_res.instances
print "Launched master in %s, regid = %s" % (zone, master_res.id)
# Launch or resume masters
if existing_masters:
print "Starting master..."
for inst in existing_masters:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
master_nodes = existing_masters
else:
master_type = opts.master_instance_type
if master_type == "":
master_type = opts.instance_type
if opts.zone == 'all':
opts.zone = random.choice(conn.get_all_zones()).name
master_res = image.run(key_name = opts.key_pair,
security_groups = [master_group],
instance_type = master_type,
placement = opts.zone,
min_count = 1,
max_count = 1,
block_device_map = block_map)
master_nodes = master_res.instances
print "Launched master in %s, regid = %s" % (zone, master_res.id)

# Return all the instances
return (master_nodes, slave_nodes)
Expand Down Expand Up @@ -732,6 +741,7 @@ def real_main():
cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
"BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
"AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"All data on spot-instance slaves will be lost.\n" +
"Stop cluster " + cluster_name + " (y/N): ")
if response == "y":
(master_nodes, slave_nodes) = get_existing_cluster(
Expand All @@ -743,7 +753,10 @@ def real_main():
print "Stopping slaves..."
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
if inst.spot_instance_request_id:
inst.terminate()
else:
inst.stop()

elif action == "start":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
Expand Down

0 comments on commit bc9f7ea

Please sign in to comment.