This repository has been archived by the owner on Mar 4, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.go
139 lines (114 loc) · 4.33 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package main
import (
"context"
"crypto/tls"
"flag"
"net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/coreos/go-semver/semver"
"github.com/fnproject/lb/lb"
"github.com/sirupsen/logrus"
)
const VERSION = "0.0.240"
func main() {
// XXX (reed): normalize
level, err := logrus.ParseLevel(os.Getenv("LOG_LEVEL"))
if err != nil {
level = logrus.InfoLevel
}
logrus.SetLevel(level)
fnodes := flag.String("nodes", "", "comma separated list of functions nodes")
minAPIVersion := flag.String("min-api-version", "0.0.207", "minimal node API to accept")
var conf lb.Config
flag.StringVar(&conf.DBurl, "db", "sqlite3://:memory:", "backend to store nodes, default to in memory; use k8s for kuberneted")
flag.StringVar(&conf.Listen, "listen", ":8081", "port to run on")
flag.StringVar(&conf.MgmtListen, "mgmt-listen", ":8081", "management port to run on")
flag.IntVar(&conf.ShutdownTimeout, "shutdown-timeout", 0, "graceful shutdown timeout")
flag.IntVar(&conf.HealthcheckInterval, "hc-interval", 3, "how often to check f(x) nodes, in seconds")
flag.StringVar(&conf.HealthcheckEndpoint, "hc-path", "/version", "endpoint to determine node health")
flag.IntVar(&conf.HealthcheckUnhealthy, "hc-unhealthy", 2, "threshold of failed checks to declare node unhealthy")
flag.IntVar(&conf.HealthcheckHealthy, "hc-healthy", 1, "threshold of success checks to declare node healthy")
flag.IntVar(&conf.HealthcheckTimeout, "hc-timeout", 5, "timeout of healthcheck endpoint, in seconds")
flag.StringVar(&conf.ZipkinURL, "zipkin", "", "zipkin endpoint to send traces")
flag.StringVar(&conf.Namespace, "namespace", "", "kubernetes namespace to monitor")
flag.StringVar(&conf.LabelSelector, "label-selector", "", "kubernetes label selector to monitor")
flag.IntVar(&conf.TargetPort, "target-port", 8080, "kubernetes port to target on selected pods")
flag.Parse()
conf.MinAPIVersion = semver.New(*minAPIVersion)
if len(*fnodes) > 0 {
// starting w/o nodes is fine too
conf.Nodes = strings.Split(*fnodes, ",")
}
conf.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 120 * time.Second,
}).Dial,
MaxIdleConnsPerHost: 512,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
},
}
db, err := lb.NewDB(conf) // Handles case where DBurl == "k8s"
if err != nil {
logrus.WithError(err).Fatal("error setting up database")
}
defer db.Close()
g, err := lb.NewAllGrouper(conf, db)
if err != nil {
logrus.WithError(err).Fatal("error setting up grouper")
}
r := lb.NewConsistentRouter(conf)
k := func(r *http.Request) (string, error) {
return r.URL.Path, nil
}
servers := make([]*http.Server, 0, 1)
handler := lb.NewProxy(k, g, r, conf)
// a separate mgmt listener is requested? then let's create a LB traffic only server
if conf.Listen != conf.MgmtListen {
servers = append(servers, &http.Server{Addr: conf.Listen, Handler: handler})
handler = lb.NullHandler()
}
// add mgmt endpoints to the handler
handler = g.Wrap(handler) // add/del/list endpoints
handler = r.Wrap(handler) // stats / dash endpoint
servers = append(servers, &http.Server{Addr: conf.MgmtListen, Handler: handler})
serve(servers, &conf)
}
func serve(servers []*http.Server, conf *lb.Config) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGQUIT, syscall.SIGINT)
for i := 0; i < len(servers); i++ {
go func(idx int) {
err := servers[idx].ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logrus.WithFields(logrus.Fields{"server_id": idx}).WithError(err).Fatal("server error")
} else {
logrus.WithFields(logrus.Fields{"server_id": idx}).Info("server stopped")
}
}(i)
}
sig := <-ch
logrus.WithFields(logrus.Fields{"signal": sig}).Info("received signal")
for i := 0; i < len(servers); i++ {
ctx := context.Background()
if conf.ShutdownTimeout > 0 {
tmpCtx, cancel := context.WithTimeout(context.Background(), time.Duration(conf.ShutdownTimeout)*time.Second)
ctx = tmpCtx
defer cancel()
}
err := servers[i].Shutdown(ctx) // safe shutdown
if err != nil {
logrus.WithFields(logrus.Fields{"server_id": i}).WithError(err).Fatal("server shutdown error")
} else {
logrus.WithFields(logrus.Fields{"server_id": i}).Info("server shutdown")
}
}
}