Skip to content

Commit

Permalink
Remove excessive waiting in kubelet etcd loop
Browse files Browse the repository at this point in the history
Listen to etcd longer, and wait a shorter time before reconnecting.
No longer an argument to the source.
  • Loading branch information
smarterclayton committed Aug 20, 2014
1 parent 9bafb8c commit 34031db
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 10 deletions.
4 changes: 2 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func startComponents(manifestURL string) (apiServerURL string) {

// Kubelet (localhost)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd"))
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, cfg1.Channel("etcd"))
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
Expand All @@ -125,7 +125,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd"))
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, cfg2.Channel("etcd"))
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(func() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func main() {
if len(etcdServerList) > 0 {
glog.Infof("Watching for etcd configs at %v", etcdServerList)
etcdClient = etcd.NewClient(etcdServerList)
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, 30*time.Second, cfg.Channel("etcd"))
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, cfg.Channel("etcd"))
}

// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
Expand Down
17 changes: 11 additions & 6 deletions pkg/kubelet/config/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,21 @@ type SourceEtcd struct {
client tools.EtcdClient
updates chan<- interface{}

waitDuration time.Duration
interval time.Duration
timeout time.Duration
}

// NewSourceEtcd creates a config source that watches and pulls from a key in etcd
func NewSourceEtcd(key string, client tools.EtcdClient, period time.Duration, updates chan<- interface{}) *SourceEtcd {
func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface{}) *SourceEtcd {
config := &SourceEtcd{
key: key,
client: client,
updates: updates,

waitDuration: period,
timeout: 1 * time.Minute,
}
glog.Infof("Watching etcd for %s", key)
go util.Forever(config.run, period)
go util.Forever(config.run, time.Second)
return config
}

Expand All @@ -81,7 +82,7 @@ func (s *SourceEtcd) fetchNextState(fromIndex uint64) (nextIndex uint64, err err
if fromIndex == 0 {
response, err = s.client.Get(s.key, true, false)
} else {
response, err = s.client.Watch(s.key, fromIndex, false, nil, stopChannel(s.waitDuration))
response, err = s.client.Watch(s.key, fromIndex, false, nil, stopChannel(s.timeout))
if tools.IsEtcdWatchStoppedByUser(err) {
return fromIndex, nil
}
Expand Down Expand Up @@ -125,9 +126,13 @@ func responseToPods(response *etcd.Response) ([]kubelet.Pod, error) {
return pods, nil
}

// stopChannel creates a channel that is closed after a duration for use with etcd client API
// stopChannel creates a channel that is closed after a duration for use with etcd client API.
// If until is 0, the channel will never close.
func stopChannel(until time.Duration) chan bool {
stop := make(chan bool)
if until == 0 {
return stop
}
go func() {
select {
case <-time.After(until):
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/config/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestGetEtcdData(t *testing.T) {
},
E: nil,
}
NewSourceEtcd("/registry/hosts/machine/kubelet", fakeClient, time.Millisecond, ch)
NewSourceEtcd("/registry/hosts/machine/kubelet", fakeClient, ch)

//TODO: update FakeEtcdClient.Watch to handle receiver=nil with a given index
//returns an infinite stream of updates
Expand Down

0 comments on commit 34031db

Please sign in to comment.