Skip to content

Commit

Permalink
feat: support reload cfg by ticker
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Oct 7, 2023
1 parent eb82f48 commit 6e632db
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 67 deletions.
163 changes: 104 additions & 59 deletions cmd/ehco/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ import (

// flags
var (
LocalAddr string
ListenType string
TCPRemoteAddr string
UDPRemoteAddr string
TransportType string
ConfigPath string
WebPort int
WebToken string
EnablePing bool
SystemFilePath = "/etc/systemd/system/ehco.service"
LogLevel string
LocalAddr string
ListenType string
TCPRemoteAddr string
UDPRemoteAddr string
TransportType string
ConfigPath string
WebPort int
WebToken string
EnablePing bool
SystemFilePath = "/etc/systemd/system/ehco.service"
LogLevel string
ConfigReloadInterval int
)

var cmdLogger *zap.SugaredLogger
Expand Down Expand Up @@ -138,6 +139,13 @@ func createCliAPP() *cli.App {
Destination: &WebToken,
DefaultText: "info",
},
&cli.IntFlag{
Name: "config_reload_interval",
Usage: "config reload interval",
EnvVars: []string{"EHCO_CONFIG_RELOAD_INTERVAL"},
Destination: &ConfigReloadInterval,
DefaultText: "60",
},
}

app.Commands = []*cli.Command{
Expand Down Expand Up @@ -166,18 +174,18 @@ func createCliAPP() *cli.App {

func loadConfig() (cfg *config.Config, err error) {
if ConfigPath != "" {
cfg = config.NewConfigByPath(ConfigPath)
cfg = config.NewConfig(ConfigPath)
if err := cfg.LoadConfig(); err != nil {
return nil, err
}
} else {
// prepare config from cli args
cfg = &config.Config{
WebPort: WebPort,
WebToken: WebToken,
EnablePing: EnablePing,
PATH: ConfigPath,
LogLeveL: LogLevel,
WebPort: WebPort,
WebToken: WebToken,
EnablePing: EnablePing,
PATH: ConfigPath,
LogLeveL: LogLevel,
ReloadInterval: ConfigReloadInterval,
RelayConfigs: []config.RelayConfig{
{
Listen: LocalAddr,
Expand Down Expand Up @@ -245,7 +253,7 @@ func startRelayServers(ctx context.Context, cfg *config.Config) error {
}
// start watch config file TODO support reload from http , refine the ConfigPath global var
if ConfigPath != "" {
go watchAndReloadConfig(ctx, relayM, errCH)
go watchAndReloadRelayConfig(ctx, cfg, relayM, errCH)
}

select {
Expand All @@ -262,55 +270,92 @@ func startRelayServers(ctx context.Context, cfg *config.Config) error {
}
}

func watchAndReloadConfig(ctx context.Context, relayM *sync.Map, errCh chan error) {
func watchAndReloadRelayConfig(ctx context.Context, curCfg *config.Config, relayM *sync.Map, errCh chan error) {
cmdLogger.Infof("Start to watch config file: %s ", ConfigPath)
reladRelay := func() error {
newCfg, err := loadConfig()
if err != nil {
cmdLogger.Errorf("Reloading Realy Conf meet error: %s ", err)
return err
}
var newRelayAddrList []string
for idx := range newCfg.RelayConfigs {
r, err := relay.NewRelay(&newCfg.RelayConfigs[idx])
if err != nil {
cmdLogger.Errorf("reload new relay failed err=%s", err.Error())
return err
}
newRelayAddrList = append(newRelayAddrList, r.Name)
// reload old relay
if oldR, ok := relayM.Load(r.Name); ok {
oldR := oldR.(*relay.Relay)
if oldR.Name != r.Name {
cmdLogger.Infof("close old relay name=%s", oldR.Name)
stopOneRelay(oldR, relayM)
go startOneRelay(r, relayM, errCh)
}
continue // no need to reload
}
// start bread new relay that not in old relayM
cmdLogger.Infof("starr new relay name=%s", r.Name)
go startOneRelay(r, relayM, errCh)
}
// closed relay not in new config
relayM.Range(func(key, value interface{}) bool {
oldAddr := key.(string)
if !inArray(oldAddr, newRelayAddrList) {
v, _ := relayM.Load(oldAddr)
oldR := v.(*relay.Relay)
stopOneRelay(oldR, relayM)
}
return true
})
return nil
}

reloadCH := make(chan os.Signal, 1)
signal.Notify(reloadCH, syscall.SIGHUP)
reloadCH := make(chan struct{}, 1)

// listen syscall.SIGHUP to trigger reload
sigHubCH := make(chan os.Signal, 1)
signal.Notify(sigHubCH, syscall.SIGHUP)
go func() {
for {
select {
case <-ctx.Done():
return
case <-sigHubCH:
cmdLogger.Info("Now Reloading Relay Conf By HUP Signal! ")
reloadCH <- struct{}{}
}
}
}()

// ticker to reload config
if curCfg.ReloadInterval > 0 {
ticker := time.NewTicker(time.Second * time.Duration(curCfg.ReloadInterval))
defer ticker.Stop()
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cmdLogger.Info("Now Reloading Relay Conf By Ticker! ")
reloadCH <- struct{}{}
}
}
}()
}

for {
select {
case <-ctx.Done():
return
case <-reloadCH:
cmdLogger.Info("Got A HUP Signal! Now Reloading Conf")
newCfg, err := loadConfig()
if err != nil {
cmdLogger.Fatalf("Reloading Conf meet error: %s ", err)
}

var newRelayAddrList []string
for idx := range newCfg.RelayConfigs {
r, err := relay.NewRelay(&newCfg.RelayConfigs[idx])
if err != nil {
cmdLogger.Fatalf("reload new relay failed err=%s", err.Error())
}
newRelayAddrList = append(newRelayAddrList, r.Name)

// reload old relay
if oldR, ok := relayM.Load(r.Name); ok {
oldR := oldR.(*relay.Relay)
if oldR.Name != r.Name {
cmdLogger.Infof("close old relay name=%s", oldR.Name)
stopOneRelay(oldR, relayM)
go startOneRelay(r, relayM, errCh)
}
continue // no need to reload
}
// start bread new relay that not in old relayM
cmdLogger.Infof("starr new relay name=%s", r.Name)
go startOneRelay(r, relayM, errCh)
if err := reladRelay(); err != nil {
cmdLogger.Errorf("Reloading Relay Conf meet error: %s ", err)
errCh <- err
}
// closed relay not in new config
relayM.Range(func(key, value interface{}) bool {
oldAddr := key.(string)
if !inArray(oldAddr, newRelayAddrList) {
v, _ := relayM.Load(oldAddr)
oldR := v.(*relay.Relay)
stopOneRelay(oldR, relayM)
}
return true
})
}
}
}
Expand Down
1 change: 1 addition & 0 deletions examples/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"web_token": "",
"enable_ping": false,
"log_level": "debug",
"reload_interval": 5,
"relay_configs": [
{
"listen": "127.0.0.1:1234",
Expand Down
20 changes: 12 additions & 8 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/Ehco1996/ehco/internal/constant"
"github.com/xtls/xray-core/infra/conf"
"go.uber.org/zap"
)

type RelayConfig struct {
Expand Down Expand Up @@ -64,18 +65,21 @@ func (r *RelayConfig) Validate() error {
type Config struct {
PATH string

WebPort int `json:"web_port,omitempty"`
WebToken string `json:"web_token,omitempty"`
EnablePing bool `json:"enable_ping,omitempty"`
LogLeveL string `json:"log_level,omitempty"`
WebPort int `json:"web_port,omitempty"`
WebToken string `json:"web_token,omitempty"`
EnablePing bool `json:"enable_ping,omitempty"`
LogLeveL string `json:"log_level,omitempty"`
ReloadInterval int `json:"reload_interval,omitempty"`

RelayConfigs []RelayConfig `json:"relay_configs"`
XRayConfig *conf.Config `json:"xray_config,omitempty"`
SyncTrafficEndPoint string `json:"sync_traffic_endpoint"`

L *zap.SugaredLogger
}

func NewConfigByPath(path string) *Config {
return &Config{PATH: path, RelayConfigs: []RelayConfig{}}
func NewConfig(path string) *Config {
return &Config{PATH: path, RelayConfigs: []RelayConfig{}, L: zap.L().Sugar().Named("cfg")}
}

func (c *Config) NeedSyncUserFromServer() bool {
Expand All @@ -100,7 +104,7 @@ func (c *Config) readFromFile() error {
if err != nil {
return err
}
println("Load Config From file:", c.PATH)
c.L.Infof("Load Config From file: %s", c.PATH)
if err != nil {
return err
}
Expand All @@ -114,7 +118,7 @@ func (c *Config) readFromHttp() error {
return err
}
defer r.Body.Close()
println("Load Config From http:", c.PATH)
c.L.Infof("Load Config From HTTP: %s", c.PATH)
return json.NewDecoder(r.Body).Decode(&c)
}

Expand Down

0 comments on commit 6e632db

Please sign in to comment.