Skip to content

Commit

Permalink
chore(operator/inventory): drain sigexit channel on shutdown (akash-n…
Browse files Browse the repository at this point in the history
…etwork#194)

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian authored Feb 14, 2024
1 parent 2356d12 commit 893690e
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion operator/inventory/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,13 @@ func newClusterNodes(ctx context.Context, image, namespace string) *clusterNodes
}

func (cl *clusterNodes) Wait() error {
return cl.group.Wait()
log := fromctx.LogrFromCtx(cl.ctx).WithName("nodes")

log.Info("waiting for nodes to finish")
err := cl.group.Wait()
log.Info("all nodes finished")

return err
}

func (cl *clusterNodes) connector() error {
Expand All @@ -92,12 +98,33 @@ func (cl *clusterNodes) connector() error {

nctx, ncancel := context.WithCancel(ctx)
defer func() {
log.Info("shutting down node connectors")

ncancel()

lctx, lcancel := context.WithCancel(context.Background())

go func() {
for {
select {
case <-lctx.Done():
return
case <-cl.signaldone:
}
}
}()

for name, node := range nodes {
log.Info(fmt.Sprintf("shutting down node %s", name))
_ = node.shutdown()
delete(nodes, name)

log.Info(fmt.Sprintf("node %s has been shutdown", name))
}

lcancel()

log.Info("node connectors down")
}()

for {
Expand Down Expand Up @@ -153,6 +180,7 @@ func (cl *clusterNodes) run() error {

events := bus.Sub(topicInventoryNode)
defer bus.Unsub(events)

for {
select {
case <-cl.ctx.Done():
Expand Down

0 comments on commit 893690e

Please sign in to comment.