forked from spegel-org/spegel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
161 lines (144 loc) · 5.05 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package main
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
"syscall"
"time"
"github.com/alexflint/go-arg"
"github.com/containerd/containerd"
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/afero"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
pkgkubernetes "github.com/xenitab/pkg/kubernetes"
"github.com/xenitab/spegel/internal/mirror"
"github.com/xenitab/spegel/internal/registry"
"github.com/xenitab/spegel/internal/routing"
"github.com/xenitab/spegel/internal/state"
)
type ConfigurationCmd struct {
ContainerdRegistryConfigPath string `arg:"--containerd-registry-config-path" default:"/etc/containerd/certs.d" help:"Directory where mirror configuration is written."`
Registries []url.URL `arg:"--registries,required" help:"registries that are configured to be mirrored."`
MirrorRegistries []url.URL `arg:"--mirror-registries,required" help:"registries that are configured to act as mirrors."`
}
type RegistryCmd struct {
RegistryAddr string `arg:"--registry-addr,required" help:"address to server image registry."`
RouterAddr string `arg:"--router-addr,required" help:"address to serve router."`
MetricsAddr string `arg:"--metrics-addr,required" help:"address to serve metrics."`
Registries []url.URL `arg:"--registries,required" help:"registries that are configured to be mirrored."`
ImageFilter string `arg:"--image-filter" help:"inclusive image name filter."`
ContainerdSock string `arg:"--containerd-sock" default:"/run/containerd/containerd.sock" help:"Endpoint of containerd service."`
ContainerdNamespace string `arg:"--containerd-namespace" default:"k8s.io" help:"Containerd namespace to fetch images from."`
KubeconfigPath string `arg:"--kubeconfig-path" help:"Path to the kubeconfig file."`
LeaderElectionNamespace string `arg:"--leader-election-namespace" default:"spegel" help:"Kubernetes namespace to write leader election data."`
LeaderElectionName string `arg:"--leader-election-name" default:"spegel-leader-election" help:"Name of leader election."`
}
type Arguments struct {
Configuration *ConfigurationCmd `arg:"subcommand:configuration"`
Registry *RegistryCmd `arg:"subcommand:registry"`
}
func main() {
args := &Arguments{}
arg.MustParse(args)
zapLog, err := zap.NewProduction()
if err != nil {
panic(fmt.Sprintf("who watches the watchmen (%v)?", err))
}
log := zapr.NewLogger(zapLog)
ctx := logr.NewContext(context.Background(), log)
err = run(ctx, args)
if err != nil {
log.Error(err, "")
os.Exit(1)
}
log.Info("gracefully shutdown")
}
func run(ctx context.Context, args *Arguments) error {
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM)
defer cancel()
switch {
case args.Configuration != nil:
return configurationCommand(ctx, args.Configuration)
case args.Registry != nil:
return registryCommand(ctx, args.Registry)
default:
return fmt.Errorf("unknown subcommand")
}
}
func configurationCommand(ctx context.Context, args *ConfigurationCmd) error {
fs := afero.NewOsFs()
err := mirror.AddMirrorConfiguration(ctx, fs, args.ContainerdRegistryConfigPath, args.Registries, args.MirrorRegistries)
if err != nil {
return err
}
return nil
}
func registryCommand(ctx context.Context, args *RegistryCmd) (err error) {
log := logr.FromContextOrDiscard(ctx)
g, ctx := errgroup.WithContext(ctx)
cs, err := pkgkubernetes.GetKubernetesClientset(args.KubeconfigPath)
if err != nil {
return err
}
containerdClient, err := containerd.New(args.ContainerdSock, containerd.WithDefaultNamespace(args.ContainerdNamespace))
if err != nil {
return fmt.Errorf("could not create containerd client: %w", err)
}
defer func() {
err = errors.Join(err, containerdClient.Close())
}()
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
srv := &http.Server{
Addr: args.MetricsAddr,
Handler: mux,
}
g.Go(func() error {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
})
g.Go(func() error {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return srv.Shutdown(shutdownCtx)
})
bootstrapper := routing.NewKubernetesBootstrapper(cs, args.LeaderElectionNamespace, args.LeaderElectionName)
router, err := routing.NewP2PRouter(ctx, args.RouterAddr, bootstrapper)
if err != nil {
return err
}
g.Go(func() error {
<-ctx.Done()
return router.Close()
})
g.Go(func() error {
return state.Track(ctx, containerdClient, router, args.Registries, args.ImageFilter)
})
reg, err := registry.NewRegistry(ctx, args.RegistryAddr, containerdClient, router)
if err != nil {
return err
}
g.Go(func() error {
return reg.ListenAndServe(ctx)
})
g.Go(func() error {
<-ctx.Done()
return reg.Shutdown()
})
log.Info("running registry", "addr", args.RegistryAddr)
err = g.Wait()
if err != nil {
return err
}
return nil
}