From 3d24457781930a0e013bf87aad1ff9cc66e1f1dd Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 3 Nov 2015 11:06:59 -0700 Subject: [PATCH] make packages have clean uninstall process --- .gitignore | 4 +- batch.go | 51 ++++++++++------ build.py | 58 +++++++++++++------ build.sh | 1 + cmd/kapacitord/run/server.go | 32 +++++----- .../kapacitor.conf} | 8 +++ .../logrotate => etc/logrotate.d/kapacitor | 0 scripts/post-uninstall.sh | 17 ++++++ services/replay/service.go | 4 ++ 9 files changed, 122 insertions(+), 53 deletions(-) rename etc/{config.sample.toml => kapacitor/kapacitor.conf} (89%) rename scripts/logrotate => etc/logrotate.d/kapacitor (100%) create mode 100644 scripts/post-uninstall.sh diff --git a/.gitignore b/.gitignore index 3a5d16fde..991279fc6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ .*.swp dist/* -kapacitor -kapacitord +./kapacitor +./kapacitord kapacitor_linux* kapacitord_linux* *~ diff --git a/batch.go b/batch.go index eb3f0eb07..15316ef04 100644 --- a/batch.go +++ b/batch.go @@ -92,12 +92,14 @@ type BatchNode struct { query *Query ticker ticker wg sync.WaitGroup + errC chan error } func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error) { bn := &BatchNode{ node: node{Node: n, et: et}, b: n, + errC: make(chan error, 1), } bn.node.runF = bn.runBatch bn.node.stopF = bn.stopBatch @@ -174,7 +176,9 @@ func (b *BatchNode) DBRPs() ([]DBRP, error) { func (b *BatchNode) Start(batch BatchCollector) { b.wg.Add(1) - go b.doQuery(batch) + go func() { + b.errC <- b.doQuery(batch) + }() } func (b *BatchNode) Queries(start, stop time.Time) []string { @@ -203,10 +207,14 @@ func (b *BatchNode) Queries(start, stop time.Time) []string { } // Query InfluxDB return Edge with data -func (b *BatchNode) doQuery(batch BatchCollector) { +func (b *BatchNode) doQuery(batch BatchCollector) error { defer batch.Close() defer b.wg.Done() + if b.et.tm.InfluxDBService == nil { + return errors.New("InfluxDB not configured, cannot query InfluxDB for batch query") + } + tickC := b.ticker.Start() for now := range tickC { @@ -220,8 +228,7 @@ func (b *BatchNode) doQuery(batch BatchCollector) { // Connect con, err := b.et.tm.InfluxDBService.NewClient() if err != nil { - b.logger.Println("E! " + err.Error()) - break + return err } q := client.Query{ Command: b.query.String(), @@ -230,27 +237,25 @@ func (b *BatchNode) doQuery(batch BatchCollector) { // Execute query resp, err := con.Query(q) if err != nil { - b.logger.Println("E! " + err.Error()) - return + return err } if resp.Err != nil { - b.logger.Println("E! " + resp.Err.Error()) - return + return resp.Err } // Collect batches for _, res := range resp.Results { batches, err := models.ResultToBatches(res) if err != nil { - b.logger.Println("E!", err) - return + return err } for _, bch := range batches { batch.CollectBatch(bch) } } } + return errors.New("batch ticker schedule stopped") } func (b *BatchNode) stopBatch() { @@ -261,22 +266,32 @@ func (b *BatchNode) stopBatch() { } func (b *BatchNode) runBatch() error { - for bt, ok := b.ins[0].NextBatch(); ok; bt, ok = b.ins[0].NextBatch() { - for _, child := range b.outs { - err := child.CollectBatch(bt) - if err != nil { - return err + errC := make(chan error, 1) + go func() { + for bt, ok := b.ins[0].NextBatch(); ok; bt, ok = b.ins[0].NextBatch() { + for _, child := range b.outs { + err := child.CollectBatch(bt) + if err != nil { + errC <- err + return + } } } + errC <- nil + }() + // Wait for errors + select { + case err := <-b.errC: + return err + case err := <-errC: + return err } - return nil } type ticker interface { Start() <-chan time.Time Stop() - // Return the next time the ticker will tick - // after now. + // Return the next time the ticker will tick after now. Next(now time.Time) time.Time } diff --git a/build.py b/build.py index 332834579..942e8224c 100755 --- a/build.py +++ b/build.py @@ -13,14 +13,13 @@ LOG_DIR = "/var/log/kapacitor" DATA_DIR = "/var/lib/kapacitor" SCRIPT_DIR = "/usr/lib/kapacitor/scripts" -CONFIG_DIR = "/etc/kapacitor" -LOGROTATE_DIR = "/etc/logrotate.d" INIT_SCRIPT = "scripts/init.sh" SYSTEMD_SCRIPT = "scripts/kapacitor.service" POSTINST_SCRIPT = "scripts/post-install.sh" -LOGROTATE_SCRIPT = "scripts/logrotate" -CONFIG_SAMPLE = "etc/config.sample.toml" +POSTUNINST_SCRIPT = "scripts/post-uninstall.sh" +LOGROTATE_CONFIG = "etc/logrotate.d/kapacitor" +DEFAULT_CONFIG = "etc/kapacitor/kapacitor.conf" PREINST_SCRIPT = None # META-PACKAGE VARIABLES @@ -34,7 +33,33 @@ prereqs = [ 'git', 'go' ] optional_prereqs = [ 'gvm', 'fpm', 'awscmd', 'rpmbuild' ] -fpm_common_args = "-f -s dir --log error --vendor {} --url {} --after-install {} --license {} --maintainer {} --config-files {} --config-files {} --description \"{}\"".format(VENDOR, PACKAGE_URL, POSTINST_SCRIPT, PACKAGE_LICENSE, MAINTAINER, CONFIG_DIR, LOGROTATE_DIR, DESCRIPTION) +fpm_common_args = "-f -s dir --log error \ + --vendor {} \ + --url {} \ + --after-install {} \ + --after-remove {} \ + --license {} \ + --maintainer {} \ + --config-files {} \ + --config-files {} \ + --directories {} \ + --description \"{}\"".format( + VENDOR, + PACKAGE_URL, + POSTINST_SCRIPT, + POSTUNINST_SCRIPT, + PACKAGE_LICENSE, + MAINTAINER, + DEFAULT_CONFIG, + LOGROTATE_CONFIG, + ' --directories '.join([ + LOG_DIR[1:], + DATA_DIR[1:], + SCRIPT_DIR[1:], + os.path.dirname(SCRIPT_DIR[1:]), + os.path.dirname(DEFAULT_CONFIG), + ]), + DESCRIPTION) targets = { 'kapacitor' : './cmd/kapacitor/main.go', @@ -209,12 +234,9 @@ def create_dir(path): def move_file(fr, to): try: - os.rename(fr, to) + shutil.move(fr, to) except OSError as e: - try: - shutil.copyfile(fr, to) - except OSError as e: - print e + print e def create_package_fs(build_root): print "\t- Creating a filesystem hierarchy from directory: {}".format(build_root) @@ -224,15 +246,15 @@ def create_package_fs(build_root): create_dir(os.path.join(build_root, LOG_DIR[1:])) create_dir(os.path.join(build_root, DATA_DIR[1:])) create_dir(os.path.join(build_root, SCRIPT_DIR[1:])) - create_dir(os.path.join(build_root, CONFIG_DIR[1:])) - create_dir(os.path.join(build_root, LOGROTATE_DIR[1:])) + create_dir(os.path.join(build_root, os.path.dirname(DEFAULT_CONFIG))) + create_dir(os.path.join(build_root, os.path.dirname(LOGROTATE_CONFIG))) def package_scripts(build_root): - print "\t- Copying scripts and sample configuration to build directory" - shutil.copyfile(INIT_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], INIT_SCRIPT.split('/')[1])) - shutil.copyfile(SYSTEMD_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], SYSTEMD_SCRIPT.split('/')[1])) - shutil.copyfile(LOGROTATE_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], LOGROTATE_SCRIPT.split('/')[1])) - shutil.copyfile(CONFIG_SAMPLE, os.path.join(build_root, CONFIG_DIR[1:], CONFIG_SAMPLE.split('/')[1])) + print "\t- Copying scripts and configuration to build directory" + shutil.copy(INIT_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], INIT_SCRIPT.split('/')[1])) + shutil.copy(SYSTEMD_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], SYSTEMD_SCRIPT.split('/')[1])) + shutil.copy(LOGROTATE_CONFIG, os.path.join(build_root, LOGROTATE_CONFIG)) + shutil.copy(DEFAULT_CONFIG, os.path.join(build_root, DEFAULT_CONFIG)) def build_packages(build_output, version): TMP_BUILD_DIR = create_temp_dir() @@ -256,7 +278,7 @@ def build_packages(build_output, version): name = 'kapacitor' if package_type in ['zip', 'tar']: name = '{}-{}'.format(name, version) - fpm_command = "fpm {} --name {} -t {} --version {} -C {}".format(fpm_common_args, + fpm_command = "fpm {} --name {} -t {} --version {} -C {} ".format(fpm_common_args, name, package_type, version, diff --git a/build.sh b/build.sh index b125feee4..b05e81f88 100755 --- a/build.sh +++ b/build.sh @@ -7,4 +7,5 @@ cd $DIR set -e # Run the build utility via Docker docker build --tag=kapacitor-builder $DIR +echo "Running build.py" docker run -v $DIR:/gopath/src/github.com/influxdb/kapacitor kapacitor-builder "$@" diff --git a/cmd/kapacitord/run/server.go b/cmd/kapacitord/run/server.go index 73f1f1260..70998756c 100644 --- a/cmd/kapacitord/run/server.go +++ b/cmd/kapacitord/run/server.go @@ -45,13 +45,13 @@ type Server struct { TaskMaster *kapacitor.TaskMaster - LogService *logging.Service - HTTPDService *httpd.Service - Streamer *streamer.Service - TaskStore *task_store.Service - ReplayService *replay.Service - InfluxDB *influxdb.Service - SMTPService *smtp.Service + LogService *logging.Service + HTTPDService *httpd.Service + Streamer *streamer.Service + TaskStore *task_store.Service + ReplayService *replay.Service + InfluxDBService *influxdb.Service + SMTPService *smtp.Service MetaStore *metastore QueryExecutor *queryexecutor @@ -136,14 +136,16 @@ func (s *Server) appendSMTPService(c smtp.Config) { } func (s *Server) appendInfluxDBService(c influxdb.Config, hostname string) { - l := s.LogService.NewLogger("[influxdb] ", log.LstdFlags) - srv := influxdb.NewService(c, hostname, l) - srv.PointsWriter = s.Streamer - srv.LogService = s.LogService + if c.Enabled { + l := s.LogService.NewLogger("[influxdb] ", log.LstdFlags) + srv := influxdb.NewService(c, hostname, l) + srv.PointsWriter = s.Streamer + srv.LogService = s.LogService - s.InfluxDB = srv - s.TaskMaster.InfluxDBService = srv - s.Services = append(s.Services, srv) + s.InfluxDBService = srv + s.TaskMaster.InfluxDBService = srv + s.Services = append(s.Services, srv) + } } func (s *Server) appendHTTPDService(c httpd.Config) { @@ -174,7 +176,7 @@ func (s *Server) appendReplayStoreService(c replay.Config) { srv := replay.NewService(c, l) srv.TaskStore = s.TaskStore srv.HTTPDService = s.HTTPDService - srv.InfluxDBService = s.InfluxDB + srv.InfluxDBService = s.InfluxDBService srv.TaskMaster = s.TaskMaster s.ReplayService = srv diff --git a/etc/config.sample.toml b/etc/kapacitor/kapacitor.conf similarity index 89% rename from etc/config.sample.toml rename to etc/kapacitor/kapacitor.conf index e6507de5e..6b9c50aad 100644 --- a/etc/config.sample.toml +++ b/etc/kapacitor/kapacitor.conf @@ -16,6 +16,14 @@ reporting-disabled = false https-enabled = false https-certificate = "/etc/ssl/kapacitor.pem" +[logging] + # Destination for logs + # Can be a path to a file or STDOUT, STDERR + file = "/var/log/kapacitor/kapacitor.log" + # Logging level can be one of: + # DEBUG, INFO, WARN, ERROR, or OFF + level = "INFO" + [replay] # Where to store replay files dir = "/var/lib/kapacitor/replay" diff --git a/scripts/logrotate b/etc/logrotate.d/kapacitor similarity index 100% rename from scripts/logrotate rename to etc/logrotate.d/kapacitor diff --git a/scripts/post-uninstall.sh b/scripts/post-uninstall.sh new file mode 100644 index 000000000..159fbf25d --- /dev/null +++ b/scripts/post-uninstall.sh @@ -0,0 +1,17 @@ +#!/bin/sh + +rm -f /etc/default/kapacitor + +# Systemd +if which systemctl > /dev/null 2>&1 ; then + systemctl disable kapacitor + rm -f /lib/systemd/system/kapacitor.service +# Sysv +else + if which update-rc.d > /dev/null 2>&1 ; then + update-rc.d -f kapacitor remove + else + chkconfig --del kapacitor + fi + rm -f /etc/init.d/kapacitor +fi diff --git a/services/replay/service.go b/services/replay/service.go index 7fc6b5d2b..64c2d3b53 100644 --- a/services/replay/service.go +++ b/services/replay/service.go @@ -3,6 +3,7 @@ package replay import ( "archive/zip" "compress/gzip" + "errors" "fmt" "io" "io/ioutil" @@ -489,6 +490,9 @@ func (r *Service) doRecordBatch(rid uuid.UUID, t *kapacitor.Task, start, stop ti return err } + if r.InfluxDBService == nil { + return errors.New("InfluxDB not configured, cannot record batch query") + } con, err := r.InfluxDBService.NewClient() if err != nil { return err