Skip to content

Commit

Permalink
[SPARK-3405] add subnet-id and vpc-id options to spark_ec2.py
Browse files Browse the repository at this point in the history
Based on this gist:
https://gist.github.com/amar-analytx/0b62543621e1f246c0a2

We use security group ids instead of security group to get around this issue:
boto/boto#350

Author: Mike Jennings <[email protected]>
Author: Mike Jennings <[email protected]>

Closes apache#2872 from mvj101/SPARK-3405 and squashes the following commits:

be9cb43 [Mike Jennings] `pep8 spark_ec2.py` runs cleanly.
4dc6756 [Mike Jennings] Remove duplicate comment
731d94c [Mike Jennings] Update for code review.
ad90a36 [Mike Jennings] Merge branch 'master' of https://github.com/apache/spark into SPARK-3405
1ebffa1 [Mike Jennings] Merge branch 'master' into SPARK-3405
52aaeec [Mike Jennings] [SPARK-3405] add subnet-id and vpc-id options to spark_ec2.py
  • Loading branch information
mvj101 authored and JoshRosen committed Dec 16, 2014
1 parent cb48447 commit d12c071
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 15 deletions.
19 changes: 19 additions & 0 deletions docs/ec2-scripts.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,25 @@ another.
permissions on your private key file, you can run `launch` with the
`--resume` option to restart the setup process on an existing cluster.

# Launching a Cluster in a VPC

- Run
`./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> --vpc-id=<vpc-id> --subnet-id=<subnet-id> launch <cluster-name>`,
where `<keypair>` is the name of your EC2 key pair (that you gave it
when you created it), `<key-file>` is the private key file for your
key pair, `<num-slaves>` is the number of slave nodes to launch (try
1 at first), `<vpc-id>` is the name of your VPC, `<subnet-id>` is the
name of your subnet, and `<cluster-name>` is the name to give to your
cluster.

For example:

```bash
export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU
export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123
./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 --zone=us-west-1a --vpc-id=vpc-a28d24c7 --subnet-id=subnet-4eb27b39 --spark-version=1.1.0 launch my-spark-cluster
```

# Running Applications

- Go into the `ec2` directory in the release of Spark you downloaded.
Expand Down
66 changes: 51 additions & 15 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ def parse_args():
parser.add_option(
"--copy-aws-credentials", action="store_true", default=False,
help="Add AWS credentials to hadoop configuration to allow Spark to access S3")
parser.add_option(
"--subnet-id", default=None, help="VPC subnet to launch instances in")
parser.add_option(
"--vpc-id", default=None, help="VPC to launch instances in")

(opts, args) = parser.parse_args()
if len(args) != 2:
Expand All @@ -186,14 +190,14 @@ def parse_args():


# Get the EC2 security group of the given name, creating it if it doesn't exist
def get_or_make_group(conn, name):
def get_or_make_group(conn, name, vpc_id):
groups = conn.get_all_security_groups()
group = [g for g in groups if g.name == name]
if len(group) > 0:
return group[0]
else:
print "Creating security group " + name
return conn.create_security_group(name, "Spark EC2 group")
return conn.create_security_group(name, "Spark EC2 group", vpc_id)


# Check whether a given EC2 instance object is in a state we consider active,
Expand Down Expand Up @@ -303,12 +307,26 @@ def launch_cluster(conn, opts, cluster_name):
user_data_content = user_data_file.read()

print "Setting up security groups..."
master_group = get_or_make_group(conn, cluster_name + "-master")
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id)
slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id)
authorized_address = opts.authorized_address
if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group)
if opts.vpc_id is None:
master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group)
else:
master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
src_group=master_group)
master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
src_group=master_group)
master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
src_group=master_group)
master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
src_group=slave_group)
master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
src_group=slave_group)
master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
src_group=slave_group)
master_group.authorize('tcp', 22, 22, authorized_address)
master_group.authorize('tcp', 8080, 8081, authorized_address)
master_group.authorize('tcp', 18080, 18080, authorized_address)
Expand All @@ -320,8 +338,22 @@ def launch_cluster(conn, opts, cluster_name):
if opts.ganglia:
master_group.authorize('tcp', 5080, 5080, authorized_address)
if slave_group.rules == []: # Group was just now created
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
if opts.vpc_id is None:
slave_group.authorize(src_group=master_group)
slave_group.authorize(src_group=slave_group)
else:
slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
src_group=master_group)
slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
src_group=master_group)
slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
src_group=master_group)
slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
src_group=slave_group)
slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
src_group=slave_group)
slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
src_group=slave_group)
slave_group.authorize('tcp', 22, 22, authorized_address)
slave_group.authorize('tcp', 8080, 8081, authorized_address)
slave_group.authorize('tcp', 50060, 50060, authorized_address)
Expand All @@ -341,11 +373,12 @@ def launch_cluster(conn, opts, cluster_name):
if opts.ami is None:
opts.ami = get_spark_ami(opts)

additional_groups = []
# we use group ids to work around https://github.com/boto/boto/issues/350
additional_group_ids = []
if opts.additional_security_group:
additional_groups = [sg
for sg in conn.get_all_security_groups()
if opts.additional_security_group in (sg.name, sg.id)]
additional_group_ids = [sg.id
for sg in conn.get_all_security_groups()
if opts.additional_security_group in (sg.name, sg.id)]
print "Launching instances..."

try:
Expand Down Expand Up @@ -392,9 +425,10 @@ def launch_cluster(conn, opts, cluster_name):
placement=zone,
count=num_slaves_this_zone,
key_name=opts.key_pair,
security_groups=[slave_group] + additional_groups,
security_group_ids=[slave_group.id] + additional_group_ids,
instance_type=opts.instance_type,
block_device_map=block_map,
subnet_id=opts.subnet_id,
user_data=user_data_content)
my_req_ids += [req.id for req in slave_reqs]
i += 1
Expand Down Expand Up @@ -441,12 +475,13 @@ def launch_cluster(conn, opts, cluster_name):
num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
if num_slaves_this_zone > 0:
slave_res = image.run(key_name=opts.key_pair,
security_groups=[slave_group] + additional_groups,
security_group_ids=[slave_group.id] + additional_group_ids,
instance_type=opts.instance_type,
placement=zone,
min_count=num_slaves_this_zone,
max_count=num_slaves_this_zone,
block_device_map=block_map,
subnet_id=opts.subnet_id,
user_data=user_data_content)
slave_nodes += slave_res.instances
print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone,
Expand All @@ -467,12 +502,13 @@ def launch_cluster(conn, opts, cluster_name):
if opts.zone == 'all':
opts.zone = random.choice(conn.get_all_zones()).name
master_res = image.run(key_name=opts.key_pair,
security_groups=[master_group] + additional_groups,
security_group_ids=[master_group.id] + additional_group_ids,
instance_type=master_type,
placement=opts.zone,
min_count=1,
max_count=1,
block_device_map=block_map,
subnet_id=opts.subnet_id,
user_data=user_data_content)
master_nodes = master_res.instances
print "Launched master in %s, regid = %s" % (zone, master_res.id)
Expand Down

0 comments on commit d12c071

Please sign in to comment.