Skip to content

Commit

Permalink
move generic communication substrate into its own 'mesh' package
Browse files Browse the repository at this point in the history
This is mostly a matter of moving some files, changing package names,
changing some imports, and changing some type references.

We duplicate the odd constant & global var, and the intmac/macint
functions, since neither 'router' or 'mesh' nor, really,' common' are
a natural home for them.
  • Loading branch information
rade committed Nov 9, 2015
1 parent e5c6980 commit dc9f876
Show file tree
Hide file tree
Showing 47 changed files with 304 additions and 254 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ $(NETCHECK_EXE): common/*.go common/*/*.go net/*.go
go build $(BUILD_FLAGS) -o $@ ./$(@D)
$(NETGO_CHECK)

$(WEAVER_EXE): router/*.go ipam/*.go ipam/*/*.go nameserver/*.go prog/weaver/*.go
$(WEAVER_EXE): router/*.go mesh/*.go ipam/*.go ipam/*/*.go nameserver/*.go prog/weaver/*.go
$(WEAVEPROXY_EXE): proxy/*.go prog/weaveproxy/main.go
$(NETCHECK_EXE): prog/netcheck/netcheck.go

Expand Down
74 changes: 37 additions & 37 deletions ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/weaveworks/weave/ipam/paxos"
"github.com/weaveworks/weave/ipam/ring"
"github.com/weaveworks/weave/ipam/space"
"github.com/weaveworks/weave/mesh"
"github.com/weaveworks/weave/net/address"
"github.com/weaveworks/weave/router"
)

// Kinds of message we can unicast to other peers
Expand Down Expand Up @@ -43,39 +43,39 @@ type operation interface {
// are used around data structures.
type Allocator struct {
actionChan chan<- func()
ourName router.PeerName
ourName mesh.PeerName
universe address.Range // superset of all ranges
ring *ring.Ring // information on ranges owned by all peers
space space.Space // more detail on ranges owned by us
owned map[string][]address.Address // who owns what addresses, indexed by container-ID
nicknames map[router.PeerName]string // so we can map nicknames for rmpeer
nicknames map[mesh.PeerName]string // so we can map nicknames for rmpeer
pendingAllocates []operation // held until we get some free space
pendingClaims []operation // held until we know who owns the space
gossip router.Gossip // our link to the outside world for sending messages
gossip mesh.Gossip // our link to the outside world for sending messages
paxos *paxos.Node
paxosTicker *time.Ticker
shuttingDown bool // to avoid doing any requests while trying to shut down
isKnownPeer func(router.PeerName) bool
isKnownPeer func(mesh.PeerName) bool
now func() time.Time
}

// NewAllocator creates and initialises a new Allocator
func NewAllocator(ourName router.PeerName, ourUID router.PeerUID, ourNickname string, universe address.Range, quorum uint, isKnownPeer func(name router.PeerName) bool) *Allocator {
func NewAllocator(ourName mesh.PeerName, ourUID mesh.PeerUID, ourNickname string, universe address.Range, quorum uint, isKnownPeer func(name mesh.PeerName) bool) *Allocator {
return &Allocator{
ourName: ourName,
universe: universe,
ring: ring.New(universe.Start, universe.End, ourName),
owned: make(map[string][]address.Address),
paxos: paxos.NewNode(ourName, ourUID, quorum),
nicknames: map[router.PeerName]string{ourName: ourNickname},
nicknames: map[mesh.PeerName]string{ourName: ourNickname},
isKnownPeer: isKnownPeer,
now: time.Now,
}
}

// Start runs the allocator goroutine
func (alloc *Allocator) Start() {
actionChan := make(chan func(), router.ChannelSize)
actionChan := make(chan func(), mesh.ChannelSize)
alloc.actionChan = actionChan
go alloc.actorLoop(actionChan)
}
Expand Down Expand Up @@ -165,7 +165,7 @@ func (alloc *Allocator) tryPendingOps() {
}
}

func (alloc *Allocator) spaceRequestDenied(sender router.PeerName, r address.Range) {
func (alloc *Allocator) spaceRequestDenied(sender mesh.PeerName, r address.Range) {
for i := 0; i < len(alloc.pendingClaims); {
claim := alloc.pendingClaims[i].(*claim)
if r.Contains(claim.addr) {
Expand Down Expand Up @@ -276,27 +276,27 @@ func (alloc *Allocator) Free(ident string, addrToFree address.Address) error {
return <-errChan
}

func (alloc *Allocator) pickPeerFromNicknames(isValid func(router.PeerName) bool) router.PeerName {
func (alloc *Allocator) pickPeerFromNicknames(isValid func(mesh.PeerName) bool) mesh.PeerName {
for name := range alloc.nicknames {
if name != alloc.ourName && isValid(name) {
return name
}
}
return router.UnknownPeerName
return mesh.UnknownPeerName
}

func (alloc *Allocator) pickPeerForTransfer() router.PeerName {
func (alloc *Allocator) pickPeerForTransfer() mesh.PeerName {
// first try alive peers that actively participate in IPAM (i.e. have entries)
if heir := alloc.ring.PickPeerForTransfer(alloc.isKnownPeer); heir != router.UnknownPeerName {
if heir := alloc.ring.PickPeerForTransfer(alloc.isKnownPeer); heir != mesh.UnknownPeerName {
return heir
}
// next try alive peers that have IPAM enabled but have no entries
if heir := alloc.pickPeerFromNicknames(alloc.isKnownPeer); heir != router.UnknownPeerName {
if heir := alloc.pickPeerFromNicknames(alloc.isKnownPeer); heir != mesh.UnknownPeerName {
return heir
}
// next try disappeared peers that still have entries
t := func(router.PeerName) bool { return true }
if heir := alloc.ring.PickPeerForTransfer(t); heir != router.UnknownPeerName {
t := func(mesh.PeerName) bool { return true }
if heir := alloc.ring.PickPeerForTransfer(t); heir != mesh.UnknownPeerName {
return heir
}
// finally, disappeared peers that that passively participated in IPAM
Expand All @@ -311,7 +311,7 @@ func (alloc *Allocator) Shutdown() {
alloc.shuttingDown = true
alloc.cancelOps(&alloc.pendingClaims)
alloc.cancelOps(&alloc.pendingAllocates)
if heir := alloc.pickPeerForTransfer(); heir != router.UnknownPeerName {
if heir := alloc.pickPeerForTransfer(); heir != mesh.UnknownPeerName {
alloc.ring.Transfer(alloc.ourName, heir)
alloc.space.Clear()
alloc.gossip.GossipBroadcast(alloc.Gossip())
Expand Down Expand Up @@ -350,14 +350,14 @@ func (alloc *Allocator) AdminTakeoverRanges(peerNameOrNickname string) error {
// call into the router for this because we are interested in peers
// that have gone away but are still in the ring, which is why we
// maintain our own nicknames map.
func (alloc *Allocator) lookupPeername(name string) (router.PeerName, error) {
func (alloc *Allocator) lookupPeername(name string) (mesh.PeerName, error) {
for peername, nickname := range alloc.nicknames {
if nickname == name {
return peername, nil
}
}

return router.PeerNameFromString(name)
return mesh.PeerNameFromString(name)
}

// Restrict the peers in "nicknames" to those in the ring plus peers known to the router
Expand All @@ -370,7 +370,7 @@ func (alloc *Allocator) pruneNicknames() {
}
}

func (alloc *Allocator) annotatePeernames(names []router.PeerName) []string {
func (alloc *Allocator) annotatePeernames(names []mesh.PeerName) []string {
var res []string
for _, name := range names {
if nickname, found := alloc.nicknames[name]; found {
Expand All @@ -388,7 +388,7 @@ func decodeRange(msg []byte) (r address.Range, err error) {
}

// OnGossipUnicast (Sync)
func (alloc *Allocator) OnGossipUnicast(sender router.PeerName, msg []byte) error {
func (alloc *Allocator) OnGossipUnicast(sender mesh.PeerName, msg []byte) error {
alloc.debugln("OnGossipUnicast from", sender, ": ", len(msg), "bytes")
resultChan := make(chan error)
alloc.actionChan <- func() {
Expand All @@ -414,7 +414,7 @@ func (alloc *Allocator) OnGossipUnicast(sender router.PeerName, msg []byte) erro
}

// OnGossipBroadcast (Sync)
func (alloc *Allocator) OnGossipBroadcast(sender router.PeerName, msg []byte) (router.GossipData, error) {
func (alloc *Allocator) OnGossipBroadcast(sender mesh.PeerName, msg []byte) (mesh.GossipData, error) {
alloc.debugln("OnGossipBroadcast from", sender, ":", len(msg), "bytes")
resultChan := make(chan error)
alloc.actionChan <- func() {
Expand All @@ -427,7 +427,7 @@ type gossipState struct {
// We send a timstamp along with the information to be
// gossipped in order to detect skewed clocks
Now int64
Nicknames map[router.PeerName]string
Nicknames map[mesh.PeerName]string

Paxos paxos.GossipState
Ring *ring.Ring
Expand Down Expand Up @@ -463,11 +463,11 @@ func (alloc *Allocator) Encode() []byte {
}

// OnGossip (Sync)
func (alloc *Allocator) OnGossip(msg []byte) (router.GossipData, error) {
func (alloc *Allocator) OnGossip(msg []byte) (mesh.GossipData, error) {
alloc.debugln("Allocator.OnGossip:", len(msg), "bytes")
resultChan := make(chan error)
alloc.actionChan <- func() {
resultChan <- alloc.update(router.UnknownPeerName, msg)
resultChan <- alloc.update(mesh.UnknownPeerName, msg)
}
return nil, <-resultChan // for now, we never propagate updates. TBD
}
Expand All @@ -478,7 +478,7 @@ type ipamGossipData struct {
alloc *Allocator
}

func (d *ipamGossipData) Merge(other router.GossipData) {
func (d *ipamGossipData) Merge(other mesh.GossipData) {
// no-op
}

Expand All @@ -488,12 +488,12 @@ func (d *ipamGossipData) Encode() [][]byte {

// Gossip returns a GossipData implementation, which in this case always
// returns the latest ring state (and does nothing on merge)
func (alloc *Allocator) Gossip() router.GossipData {
func (alloc *Allocator) Gossip() mesh.GossipData {
return &ipamGossipData{alloc}
}

// SetInterfaces gives the allocator two interfaces for talking to the outside world
func (alloc *Allocator) SetInterfaces(gossip router.Gossip) {
func (alloc *Allocator) SetInterfaces(gossip mesh.Gossip) {
alloc.gossip = gossip
}

Expand Down Expand Up @@ -540,7 +540,7 @@ func (alloc *Allocator) establishRing() {
}
}

func (alloc *Allocator) createRing(peers []router.PeerName) {
func (alloc *Allocator) createRing(peers []mesh.PeerName) {
alloc.debugln("Paxos consensus:", peers)
alloc.ring.ClaimForPeers(normalizeConsensus(peers))
alloc.gossip.GossipBroadcast(alloc.Gossip())
Expand All @@ -563,7 +563,7 @@ func (alloc *Allocator) ringUpdated() {
}

// For compatibility with sort.Interface
type peerNames []router.PeerName
type peerNames []mesh.PeerName

func (a peerNames) Len() int { return len(a) }
func (a peerNames) Less(i, j int) bool { return a[i] < a[j] }
Expand All @@ -572,7 +572,7 @@ func (a peerNames) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// When we get a consensus from Paxos, the peer names are not in a
// defined order and may contain duplicates. This function sorts them
// and de-dupes.
func normalizeConsensus(consensus []router.PeerName) []router.PeerName {
func normalizeConsensus(consensus []mesh.PeerName) []mesh.PeerName {
if len(consensus) == 0 {
return nil
}
Expand Down Expand Up @@ -607,22 +607,22 @@ func encodeRange(r address.Range) []byte {
return buf.Bytes()
}

func (alloc *Allocator) sendSpaceRequest(dest router.PeerName, r address.Range) error {
func (alloc *Allocator) sendSpaceRequest(dest mesh.PeerName, r address.Range) error {
msg := append([]byte{msgSpaceRequest}, encodeRange(r)...)
return alloc.gossip.GossipUnicast(dest, msg)
}

func (alloc *Allocator) sendSpaceRequestDenied(dest router.PeerName, r address.Range) error {
func (alloc *Allocator) sendSpaceRequestDenied(dest mesh.PeerName, r address.Range) error {
msg := append([]byte{msgSpaceRequestDenied}, encodeRange(r)...)
return alloc.gossip.GossipUnicast(dest, msg)
}

func (alloc *Allocator) sendRingUpdate(dest router.PeerName) {
func (alloc *Allocator) sendRingUpdate(dest mesh.PeerName) {
msg := append([]byte{msgRingUpdate}, alloc.encode()...)
alloc.gossip.GossipUnicast(dest, msg)
}

func (alloc *Allocator) update(sender router.PeerName, msg []byte) error {
func (alloc *Allocator) update(sender mesh.PeerName, msg []byte) error {
reader := bytes.NewReader(msg)
decoder := gob.NewDecoder(reader)
var data gossipState
Expand Down Expand Up @@ -674,7 +674,7 @@ func (alloc *Allocator) update(sender router.PeerName, msg []byte) error {
alloc.createRing(cons.Value)
}
}
} else if sender != router.UnknownPeerName {
} else if sender != mesh.UnknownPeerName {
// Sender is trying to initialize a ring, but we have one
// already - send it straight back
alloc.sendRingUpdate(sender)
Expand All @@ -684,7 +684,7 @@ func (alloc *Allocator) update(sender router.PeerName, msg []byte) error {
return nil
}

func (alloc *Allocator) donateSpace(r address.Range, to router.PeerName) {
func (alloc *Allocator) donateSpace(r address.Range, to mesh.PeerName) {
// No matter what we do, we'll send a unicast gossip
// of our ring back to tha chap who asked for space.
// This serves to both tell him of any space we might
Expand Down
6 changes: 3 additions & 3 deletions ipam/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"fmt"

"github.com/weaveworks/weave/common"
"github.com/weaveworks/weave/mesh"
"github.com/weaveworks/weave/net/address"
"github.com/weaveworks/weave/router"
)

type claim struct {
Expand Down Expand Up @@ -43,7 +43,7 @@ func (c *claim) Try(alloc *Allocator) bool {
switch owner := alloc.ring.Owner(c.addr); owner {
case alloc.ourName:
// success
case router.UnknownPeerName:
case mesh.UnknownPeerName:
// If our ring doesn't know, it must be empty.
if c.noErrorOnUnknown {
alloc.infof("Claim %s for %s: address allocator still initializing; will try later.", c.addr, c.ident)
Expand Down Expand Up @@ -86,7 +86,7 @@ func (c *claim) Try(alloc *Allocator) bool {
return true
}

func (c *claim) deniedBy(alloc *Allocator, owner router.PeerName) {
func (c *claim) deniedBy(alloc *Allocator, owner mesh.PeerName) {
name, found := alloc.nicknames[owner]
if found {
name = " (" + name + ")"
Expand Down
12 changes: 6 additions & 6 deletions ipam/paxos/paxos.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package paxos

import (
"github.com/weaveworks/weave/router"
"github.com/weaveworks/weave/mesh"
)

// The node identifier. The use of the UID here is important: Paxos
// acceptors must not forget their promises, so it's important that a
// node does not restart and lose its Paxos state but claim to have
// the same ID.
type NodeID struct {
Name router.PeerName
UID router.PeerUID
Name mesh.PeerName
UID mesh.PeerUID
}

// note all fields exported in structs so we can Gob them
Expand Down Expand Up @@ -39,7 +39,7 @@ func (a ProposalID) valid() bool {
}

// For seeding IPAM, the value we want consensus on is a set of peer names
type Value []router.PeerName
type Value []mesh.PeerName

// An AcceptedValue is a Value plus the proposal which originated that
// Value. The origin is not essential, but makes comparing
Expand Down Expand Up @@ -72,7 +72,7 @@ type Node struct {
knows GossipState
}

func NewNode(name router.PeerName, uid router.PeerUID, quorum uint) *Node {
func NewNode(name mesh.PeerName, uid mesh.PeerUID, quorum uint) *Node {
return &Node{
id: NodeID{name, uid},
quorum: quorum,
Expand Down Expand Up @@ -223,7 +223,7 @@ func (node *Node) Think() bool {
// about. This is not necessarily all peer names, but it is at least
// a quorum, and so good enough for seeding the ring.
func (node *Node) pickValue() Value {
val := make([]router.PeerName, len(node.knows))
val := make([]mesh.PeerName, len(node.knows))
i := 0
for id := range node.knows {
val[i] = id.Name
Expand Down
10 changes: 5 additions & 5 deletions ipam/paxos/paxos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"testing"
"time"

"github.com/weaveworks/weave/router"
"github.com/weaveworks/weave/mesh"
)

type TestNode struct {
Expand Down Expand Up @@ -97,8 +97,8 @@ func makeRandomModel(params *TestParams, r *rand.Rand, t *testing.T) *Model {
}

for i := range m.nodes {
m.nodes[i].Node = NewNode(router.PeerName(i/2+1),
router.PeerUID(r.Int63()), m.quorum)
m.nodes[i].Node = NewNode(mesh.PeerName(i/2+1),
mesh.PeerUID(r.Int63()), m.quorum)
m.nodes[i].Propose()
}

Expand Down Expand Up @@ -172,8 +172,8 @@ func (m *Model) isolateNode(node *TestNode) {

// Restart a node
func (m *Model) restart(node *TestNode) {
node.Node = NewNode(router.PeerName(m.nextID),
router.PeerUID(m.r.Int63()), m.quorum)
node.Node = NewNode(mesh.PeerName(m.nextID),
mesh.PeerUID(m.r.Int63()), m.quorum)
m.nextID++
node.Propose()

Expand Down
Loading

0 comments on commit dc9f876

Please sign in to comment.