Skip to content

Commit

Permalink
Merge pull request flannel-io#338 from eyakubovich/new-backend-interf…
Browse files Browse the repository at this point in the history
…aces

Refactor the backend interfaces for multi-networks
  • Loading branch information
eyakubovich committed Oct 18, 2015
2 parents 9889a4d + 878c525 commit 86ec894
Show file tree
Hide file tree
Showing 18 changed files with 1,284 additions and 975 deletions.
39 changes: 18 additions & 21 deletions backend/alloc/alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,45 @@ package alloc

import (
"fmt"
"net"

"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/flannel/backend"
"github.com/coreos/flannel/pkg/ip"
"github.com/coreos/flannel/subnet"
)

func init() {
backend.Register("alloc", New)
}

type AllocBackend struct {
sm subnet.Manager
publicIP ip.IP4
mtu int
lease *subnet.Lease
extIface *backend.ExternalInterface
}

func New(sm subnet.Manager, extIface *net.Interface, extIaddr net.IP, extEaddr net.IP) (backend.Backend, error) {
func New(sm subnet.Manager, extIface *backend.ExternalInterface) (backend.Backend, error) {
be := AllocBackend{
sm: sm,
publicIP: ip.FromIP(extEaddr),
mtu: extIface.MTU,
sm: sm,
extIface: extIface,
}
return &be, nil
}

func (m *AllocBackend) RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (*backend.SubnetDef, error) {
func (_ *AllocBackend) Run(ctx context.Context) {
<-ctx.Done()
}

func (be *AllocBackend) RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (backend.Network, error) {
attrs := subnet.LeaseAttrs{
PublicIP: m.publicIP,
PublicIP: ip.FromIP(be.extIface.ExtAddr),
}

l, err := m.sm.AcquireLease(ctx, network, &attrs)
l, err := be.sm.AcquireLease(ctx, network, &attrs)
switch err {
case nil:
m.lease = l
return &backend.SubnetDef{
Lease: l,
MTU: m.mtu,
return &backend.SimpleNetwork{
SubnetLease: l,
ExtIface: be.extIface,
}, nil

case context.Canceled, context.DeadlineExceeded:
Expand All @@ -47,9 +50,3 @@ func (m *AllocBackend) RegisterNetwork(ctx context.Context, network string, conf
return nil, fmt.Errorf("failed to acquire lease: %v", err)
}
}

func (m *AllocBackend) Run(ctx context.Context) {
}

func (m *AllocBackend) UnregisterNetwork(ctx context.Context, name string) {
}
90 changes: 43 additions & 47 deletions backend/awsvpc/awsvpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package awsvpc
import (
"encoding/json"
"fmt"
"net"

"github.com/coreos/flannel/Godeps/_workspace/src/github.com/aws/aws-sdk-go/aws"
"github.com/coreos/flannel/Godeps/_workspace/src/github.com/aws/aws-sdk-go/aws/awserr"
Expand All @@ -31,42 +30,47 @@ import (
"github.com/coreos/flannel/subnet"
)

func init() {
backend.Register("aws-vpc", New)
}

type AwsVpcBackend struct {
sm subnet.Manager
publicIP ip.IP4
mtu int
cfg struct {
RouteTableID string
}
lease *subnet.Lease
extIface *backend.ExternalInterface
}

func New(sm subnet.Manager, extIface *net.Interface, extIaddr net.IP, extEaddr net.IP) (backend.Backend, error) {
func New(sm subnet.Manager, extIface *backend.ExternalInterface) (backend.Backend, error) {
be := AwsVpcBackend{
sm: sm,
publicIP: ip.FromIP(extEaddr),
mtu: extIface.MTU,
extIface: extIface,
}
return &be, nil
}

func (m *AwsVpcBackend) RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (*backend.SubnetDef, error) {
func (be *AwsVpcBackend) Run(ctx context.Context) {
<-ctx.Done()
}

func (be *AwsVpcBackend) RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (backend.Network, error) {
// Parse our configuration
cfg := struct {
RouteTableID string
}{}

if len(config.Backend) > 0 {
if err := json.Unmarshal(config.Backend, &m.cfg); err != nil {
if err := json.Unmarshal(config.Backend, &cfg); err != nil {
return nil, fmt.Errorf("error decoding VPC backend config: %v", err)
}
}

// Acquire the lease form subnet manager
attrs := subnet.LeaseAttrs{
PublicIP: m.publicIP,
PublicIP: ip.FromIP(be.extIface.ExtAddr),
}

l, err := m.sm.AcquireLease(ctx, network, &attrs)
l, err := be.sm.AcquireLease(ctx, network, &attrs)
switch err {
case nil:
m.lease = l

case context.Canceled, context.DeadlineExceeded:
return nil, err
Expand All @@ -88,20 +92,20 @@ func (m *AwsVpcBackend) RegisterNetwork(ctx context.Context, network string, con

ec2c := ec2.New(&aws.Config{Region: aws.String(region)})

if _, err = m.disableSrcDestCheck(instanceID, ec2c); err != nil {
if _, err = be.disableSrcDestCheck(instanceID, ec2c); err != nil {
log.Infof("Warning- disabling source destination check failed: %v", err)
}

if m.cfg.RouteTableID == "" {
if cfg.RouteTableID == "" {
log.Infof("RouteTableID not passed as config parameter, detecting ...")
if err := m.detectRouteTableID(instanceID, ec2c); err != nil {
if cfg.RouteTableID, err = be.detectRouteTableID(instanceID, ec2c); err != nil {
return nil, err
}
}

log.Info("RouteRouteTableID: ", m.cfg.RouteTableID)
log.Info("RouteRouteTableID: ", cfg.RouteTableID)

matchingRouteFound, err := m.checkMatchingRoutes(instanceID, l.Subnet.String(), ec2c)
matchingRouteFound, err := be.checkMatchingRoutes(cfg.RouteTableID, instanceID, l.Subnet.String(), ec2c)
if err != nil {
log.Errorf("Error describing route tables: %v", err)

Expand All @@ -114,7 +118,7 @@ func (m *AwsVpcBackend) RegisterNetwork(ctx context.Context, network string, con

if !matchingRouteFound {
cidrBlock := l.Subnet.String()
deleteRouteInput := &ec2.DeleteRouteInput{RouteTableId: &m.cfg.RouteTableID, DestinationCidrBlock: &cidrBlock}
deleteRouteInput := &ec2.DeleteRouteInput{RouteTableId: &cfg.RouteTableID, DestinationCidrBlock: &cidrBlock}
if _, err := ec2c.DeleteRoute(deleteRouteInput); err != nil {
if ec2err, ok := err.(awserr.Error); !ok || ec2err.Code() != "InvalidRoute.NotFound" {
// an error other than the route not already existing occurred
Expand All @@ -123,25 +127,25 @@ func (m *AwsVpcBackend) RegisterNetwork(ctx context.Context, network string, con
}

// Add the route for this machine's subnet
if _, err := m.createRoute(instanceID, l.Subnet.String(), ec2c); err != nil {
if _, err := be.createRoute(cfg.RouteTableID, instanceID, l.Subnet.String(), ec2c); err != nil {
return nil, fmt.Errorf("unable to add route %s: %v", l.Subnet.String(), err)
}
}

return &backend.SubnetDef{
Lease: l,
MTU: m.mtu,
return &backend.SimpleNetwork{
SubnetLease: l,
ExtIface: be.extIface,
}, nil
}

func (m *AwsVpcBackend) checkMatchingRoutes(instanceID, subnet string, ec2c *ec2.EC2) (bool, error) {
func (be *AwsVpcBackend) checkMatchingRoutes(routeTableID, instanceID, subnet string, ec2c *ec2.EC2) (bool, error) {
matchingRouteFound := false

filter := newFilter()
filter.Add("route.destination-cidr-block", subnet)
filter.Add("route.state", "active")

input := ec2.DescribeRouteTablesInput{Filters: filter, RouteTableIds: []*string{&m.cfg.RouteTableID}}
input := ec2.DescribeRouteTablesInput{Filters: filter, RouteTableIds: []*string{&routeTableID}}

resp, err := ec2c.DescribeRouteTables(&input)
if err != nil {
Expand All @@ -165,16 +169,17 @@ func (m *AwsVpcBackend) checkMatchingRoutes(instanceID, subnet string, ec2c *ec2
return matchingRouteFound, nil
}

func (m *AwsVpcBackend) createRoute(instanceID, subnet string, ec2c *ec2.EC2) (*ec2.CreateRouteOutput, error) {
func (be *AwsVpcBackend) createRoute(routeTableID, instanceID, subnet string, ec2c *ec2.EC2) (*ec2.CreateRouteOutput, error) {
route := &ec2.CreateRouteInput{
RouteTableId: &m.cfg.RouteTableID,
RouteTableId: &routeTableID,
InstanceId: &instanceID,
DestinationCidrBlock: &subnet,
}

return ec2c.CreateRoute(route)
}
func (m *AwsVpcBackend) disableSrcDestCheck(instanceID string, ec2c *ec2.EC2) (*ec2.ModifyInstanceAttributeOutput, error) {

func (be *AwsVpcBackend) disableSrcDestCheck(instanceID string, ec2c *ec2.EC2) (*ec2.ModifyInstanceAttributeOutput, error) {
modifyAttributes := &ec2.ModifyInstanceAttributeInput{
InstanceId: aws.String(instanceID),
SourceDestCheck: &ec2.AttributeBooleanValue{Value: aws.Bool(false)},
Expand All @@ -183,22 +188,22 @@ func (m *AwsVpcBackend) disableSrcDestCheck(instanceID string, ec2c *ec2.EC2) (*
return ec2c.ModifyInstanceAttribute(modifyAttributes)
}

func (m *AwsVpcBackend) detectRouteTableID(instanceID string, ec2c *ec2.EC2) error {
func (be *AwsVpcBackend) detectRouteTableID(instanceID string, ec2c *ec2.EC2) (string, error) {
instancesInput := &ec2.DescribeInstancesInput{
InstanceIds: []*string{&instanceID},
}

resp, err := ec2c.DescribeInstances(instancesInput)
if err != nil {
return fmt.Errorf("error getting instance info: %v", err)
return "", fmt.Errorf("error getting instance info: %v", err)
}

if len(resp.Reservations) == 0 {
return fmt.Errorf("no reservations found")
return "", fmt.Errorf("no reservations found")
}

if len(resp.Reservations[0].Instances) == 0 {
return fmt.Errorf("no matching instance found with id: %v", instanceID)
return "", fmt.Errorf("no matching instance found with id: %v", instanceID)
}

subnetID := resp.Reservations[0].Instances[0].SubnetId
Expand All @@ -216,12 +221,11 @@ func (m *AwsVpcBackend) detectRouteTableID(instanceID string, ec2c *ec2.EC2) err

res, err := ec2c.DescribeRouteTables(routeTablesInput)
if err != nil {
return fmt.Errorf("error describing routeTables for subnetID %s: %v", *subnetID, err)
return "", fmt.Errorf("error describing routeTables for subnetID %s: %v", *subnetID, err)
}

if len(res.RouteTables) != 0 {
m.cfg.RouteTableID = *res.RouteTables[0].RouteTableId
return nil
return *res.RouteTables[0].RouteTableId, nil
}

filter = newFilter()
Expand All @@ -238,16 +242,8 @@ func (m *AwsVpcBackend) detectRouteTableID(instanceID string, ec2c *ec2.EC2) err
}

if len(res.RouteTables) == 0 {
return fmt.Errorf("main route table not found")
return "", fmt.Errorf("main route table not found")
}

m.cfg.RouteTableID = *res.RouteTables[0].RouteTableId

return nil
}

func (m *AwsVpcBackend) Run(ctx context.Context) {
}

func (m *AwsVpcBackend) UnregisterNetwork(ctx context.Context, name string) {
return *res.RouteTables[0].RouteTableId, nil
}
41 changes: 33 additions & 8 deletions backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package backend

import (
"net"

"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"

"github.com/coreos/flannel/subnet"
)

type SubnetDef struct {
Lease *subnet.Lease
MTU int
type ExternalInterface struct {
Iface *net.Interface
IfaceAddr net.IP
ExtAddr net.IP
}

// Besides the entry points in the Backend interface, the backend's New()
Expand All @@ -37,11 +40,33 @@ type SubnetDef struct {
// since multiple RegisterNetwork() and Run() calls may be in-flight at any
// given time for a singleton backend, it must protect these calls with a mutex.
type Backend interface {
// Called first to start the necessary event loops and such
Run(ctx context.Context)
// Called when the backend should create or begin managing a new network
RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (*SubnetDef, error)
// Called after the backend's first network has been registered to
// allow the plugin to watch dynamic events
RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (Network, error)
}

type Network interface {
Lease() *subnet.Lease
MTU() int
Run(ctx context.Context)
// Called to clean up any network resources or operations
UnregisterNetwork(ctx context.Context, network string)
}

type BackendCtor func(sm subnet.Manager, ei *ExternalInterface) (Backend, error)

type SimpleNetwork struct {
SubnetLease *subnet.Lease
ExtIface *ExternalInterface
}

func (n *SimpleNetwork) Lease() *subnet.Lease {
return n.SubnetLease
}

func (n *SimpleNetwork) MTU() int {
return n.ExtIface.Iface.MTU
}

func (_ *SimpleNetwork) Run(ctx context.Context) {
<-ctx.Done()
}
Loading

0 comments on commit 86ec894

Please sign in to comment.