Skip to content

Commit

Permalink
Fix file backend (kelseyhightower#686)
Browse files Browse the repository at this point in the history
Fix file backend
  • Loading branch information
AndrewChubatiuk authored and okushchenko committed Mar 18, 2018
1 parent 5601717 commit 1957caa
Show file tree
Hide file tree
Showing 17 changed files with 372 additions and 238 deletions.
4 changes: 2 additions & 2 deletions backends/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func New(config Config) (StoreClient, error) {
backendNodes := config.BackendNodes

if config.Backend == "file" {
log.Info("Backend source(s) set to " + config.YAMLFile)
log.Info("Backend source(s) set to " + strings.Join(config.YAMLFile, ", "))
} else {
log.Info("Backend source(s) set to " + strings.Join(backendNodes, ", "))
}
Expand Down Expand Up @@ -62,7 +62,7 @@ func New(config Config) (StoreClient, error) {
case "env":
return env.NewEnvClient()
case "file":
return file.NewFileClient(config.YAMLFile)
return file.NewFileClient(config.YAMLFile, config.Filter)
case "vault":
vaultConfig := map[string]string{
"app-id": config.AppID,
Expand Down
3 changes: 2 additions & 1 deletion backends/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
UserID string
RoleID string
SecretID string
YAMLFile string
YAMLFile []string
Filter string
Role string
}
161 changes: 117 additions & 44 deletions backends/file/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,67 +3,131 @@ package file
import (
"fmt"
"io/ioutil"
"path"
"strconv"
"strings"

"github.com/fsnotify/fsnotify"
"github.com/kelseyhightower/confd/log"
util "github.com/kelseyhightower/confd/util"
"gopkg.in/yaml.v2"
)

var replacer = strings.NewReplacer("/", "_")

// Client provides a shell for the yaml client
type Client struct {
filepath string
filepath []string
filter string
}

func NewFileClient(filepath string) (*Client, error) {
return &Client{filepath}, nil
type ResultError struct {
response uint64
err error
}

func (c *Client) GetValues(keys []string) (map[string]string, error) {
yamlMap := make(map[interface{}]interface{})
vars := make(map[string]string)
func NewFileClient(filepath []string, filter string) (*Client, error) {
return &Client{filepath: filepath, filter: filter}, nil
}

data, err := ioutil.ReadFile(c.filepath)
func readFile(path string, vars map[string]string) error {
yamlMap := make(map[interface{}]interface{})
data, err := ioutil.ReadFile(path)
if err != nil {
return vars, err
return err
}

err = yaml.Unmarshal(data, &yamlMap)
if err != nil {
return vars, err
return err
}

nodeWalk(yamlMap, "", vars)
log.Debug(fmt.Sprintf("Key Map: %#v", vars))
err = nodeWalk(yamlMap, "/", vars)
if err != nil {
return err
}
return nil
}

func (c *Client) GetValues(keys []string) (map[string]string, error) {
vars := make(map[string]string)
var filePaths []string
for _, path := range c.filepath {
p, err := util.RecursiveFilesLookup(path, c.filter)
if err != nil {
return nil, err
}
filePaths = append(filePaths, p...)
}

for _, path := range filePaths {
err := readFile(path, vars)
if err != nil {
return nil, err
}
}

VarsLoop:
for k, _ := range vars {
for _, key := range keys {
if strings.HasPrefix(k, key) {
continue VarsLoop
}
}
delete(vars, k)
}
log.Debug(fmt.Sprintf("Key Map: %#v", vars))
return vars, nil
}

// nodeWalk recursively descends nodes, updating vars.
func nodeWalk(node map[interface{}]interface{}, key string, vars map[string]string) error {
for k, v := range node {
key := key + "/" + k.(string)

switch v.(type) {
case map[interface{}]interface{}:
nodeWalk(v.(map[interface{}]interface{}), key, vars)
case []interface{}:
for _, j := range v.([]interface{}) {
switch j.(type) {
case map[interface{}]interface{}:
nodeWalk(j.(map[interface{}]interface{}), key, vars)
case string:
vars[key+"/"+j.(string)] = ""
}
}
case string:
vars[key] = v.(string)
func nodeWalk(node interface{}, key string, vars map[string]string) error {
switch node.(type) {
case []interface{}:
for i, j := range node.([]interface{}) {
key := path.Join(key, strconv.Itoa(i))
nodeWalk(j, key, vars)
}
case map[interface{}]interface{}:
for k, v := range node.(map[interface{}]interface{}) {
key := path.Join(key, k.(string))
nodeWalk(v, key, vars)
}
case string:
vars[key] = node.(string)
case int:
vars[key] = strconv.Itoa(node.(int))
case bool:
vars[key] = strconv.FormatBool(node.(bool))
case float64:
vars[key] = strconv.FormatFloat(node.(float64), 'f', -1, 64)
}
return nil
}

func (c *Client) watchChanges(watcher *fsnotify.Watcher, stopChan chan bool) ResultError {
outputChannel := make(chan ResultError)
defer close(outputChannel)
go func() error {
for {
select {
case event := <-watcher.Events:
log.Debug("event:", event)
if event.Op&fsnotify.Write == fsnotify.Write ||
event.Op&fsnotify.Remove == fsnotify.Remove ||
event.Op&fsnotify.Create == fsnotify.Create {
outputChannel <- ResultError{response: 1, err: nil}
}
case err := <-watcher.Errors:
outputChannel <- ResultError{response: 0, err: err}
case <-stopChan:
outputChannel <- ResultError{response: 1, err: nil}
}
}
}()
return <-outputChannel
}

func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error) {
if waitIndex == 0 {
return 1, nil
Expand All @@ -74,23 +138,32 @@ func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, sto
return 0, err
}
defer watcher.Close()

err = watcher.Add(c.filepath)
if err != nil {
return 0, err
}

for {
select {
case event := <-watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Remove == fsnotify.Remove {
return 1, nil
}
case err := <-watcher.Errors:
for _, path := range c.filepath {
isDir, err := util.IsDirectory(path)
if err != nil {
return 0, err
case <-stopChan:
return 0, nil
}
if isDir {
dirs, err := util.RecursiveDirsLookup(path, "*")
if err != nil {
return 0, err
}
for _, dir := range dirs {
err = watcher.Add(dir)
if err != nil {
return 0, err
}
}
} else {
err = watcher.Add(path)
if err != nil {
return 0, err
}
}
}
output := c.watchChanges(watcher, stopChan)
if output.response != 2 {
return output.response, output.err
}
return waitIndex, nil
}
13 changes: 10 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ var (
userID string
roleID string
secretID string
yamlFile string
yamlFile Nodes
filter string
)

// A Config structure is used to configure confd.
Expand Down Expand Up @@ -85,7 +86,8 @@ type Config struct {
UserID string `toml:"user_id"`
RoleID string `toml:"role_id"`
SecretID string `toml:"secret_id"`
YAMLFile string `toml:"file"`
YAMLFile []string `toml:"file"`
Filter string `toml:"filter"`
}

func init() {
Expand All @@ -97,7 +99,8 @@ func init() {
flag.StringVar(&clientKey, "client-key", "", "the client key")
flag.StringVar(&confdir, "confdir", "/etc/confd", "confd conf directory")
flag.StringVar(&configFile, "config-file", "", "the confd config file")
flag.StringVar(&yamlFile, "file", "", "the YAML/JSON file to watch for changes")
flag.Var(&yamlFile, "file", "the YAML file to watch for changes (only used with -backend=file)")
flag.StringVar(&filter, "filter", "*", "files filter (only used with -backend=file)")
flag.IntVar(&interval, "interval", 600, "backend polling interval")
flag.BoolVar(&keepStageFile, "keep-stage-file", false, "keep staged files")
flag.StringVar(&logLevel, "log-level", "", "level which confd should log messages")
Expand Down Expand Up @@ -141,6 +144,7 @@ func initConfig() error {
Interval: 600,
Prefix: "",
Scheme: "http",
Filter: "*",
}
// Update config from the TOML configuration file.
if configFile == "" {
Expand Down Expand Up @@ -261,6 +265,7 @@ func initConfig() error {
RoleID: config.RoleID,
SecretID: config.SecretID,
YAMLFile: config.YAMLFile,
Filter: config.Filter,
}
// Template configuration.
templateConfig = template.Config{
Expand Down Expand Up @@ -373,5 +378,7 @@ func setConfigFromFlag(f *flag.Flag) {
config.SecretID = secretID
case "file":
config.YAMLFile = yamlFile
case "filter":
config.Filter = filter
}
}
1 change: 1 addition & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestInitConfigDefaultConfig(t *testing.T) {
Scheme: "http",
SecretKeyring: "",
Table: "",
Filter: "*",
}
if err := initConfig(); err != nil {
t.Errorf(err.Error())
Expand Down
4 changes: 3 additions & 1 deletion docs/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ Usage of confd:
-config-file string
the confd config file
-file string
the YAML/JSON file to watch for changes
list of files/directories with data represented in YAML to watch for changes
-filter string
regex for files and dirs filtering
-interval int
backend polling interval (default 600)
-keep-stage-file
Expand Down
38 changes: 25 additions & 13 deletions integration/file/test.sh
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
#!/bin/bash

export HOSTNAME="localhost"

cat <<EOT >> test.yaml
mkdir backends1 backends2
cat <<EOT >> backends1/1.yaml
key: foobar
database:
- host: 127.0.0.1
- password: p@sSw0rd
- port: "3306"
- username: confd
host: 127.0.0.1
password: p@sSw0rd
port: "3306"
username: confd
EOT

cat <<EOT >> backends1/2.yaml
upstream:
- app1: 10.0.1.10:8080
- app2: 10.0.1.11:8080
app1: 10.0.1.10:8080
app2: 10.0.1.11:8080
EOT

cat <<EOT >> backends2/1.yaml
nested:
app1: 10.0.1.10:8080
app2: 10.0.1.11:8080
EOT

cat <<EOT >> backends2/2.yaml
prefix:
database:
- host: 127.0.0.1
- password: p@sSw0rd
- port: "3306"
- username: confd
host: 127.0.0.1
password: p@sSw0rd
port: "3306"
username: confd
upstream:
app1: 10.0.1.10:8080
app2: 10.0.1.11:8080
EOT

# Run confd
confd --onetime --log-level debug --confdir ./integration/confdir --backend file --file test.yaml --watch
confd --onetime --log-level debug --confdir ./integration/confdir --backend file --file backends1/ --file backends2/ --watch
7 changes: 4 additions & 3 deletions resource/template/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/kelseyhightower/confd/log"
util "github.com/kelseyhightower/confd/util"
)

type Processor interface {
Expand Down Expand Up @@ -91,7 +92,7 @@ func (p *watchProcessor) Process() {

func (p *watchProcessor) monitorPrefix(t *TemplateResource) {
defer p.wg.Done()
keys := appendPrefix(t.Prefix, t.Keys)
keys := util.AppendPrefix(t.Prefix, t.Keys)
for {
index, err := t.storeClient.WatchPrefix(t.Prefix, keys, t.lastIndex, p.stopChan)
if err != nil {
Expand All @@ -111,11 +112,11 @@ func getTemplateResources(config Config) ([]*TemplateResource, error) {
var lastError error
templates := make([]*TemplateResource, 0)
log.Debug("Loading template resources from confdir " + config.ConfDir)
if !isFileExist(config.ConfDir) {
if !util.IsFileExist(config.ConfDir) {
log.Warning(fmt.Sprintf("Cannot load template resources: confdir '%s' does not exist", config.ConfDir))
return nil, nil
}
paths, err := recursiveFindFiles(config.ConfigDir, "*toml")
paths, err := util.RecursiveFilesLookup(config.ConfigDir, "*toml")
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 1957caa

Please sign in to comment.