Skip to content

Commit

Permalink
make packages have clean uninstall process
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Nov 3, 2015
1 parent 96687f6 commit 3d24457
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 53 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.*.swp
dist/*
kapacitor
kapacitord
./kapacitor
./kapacitord
kapacitor_linux*
kapacitord_linux*
*~
Expand Down
51 changes: 33 additions & 18 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ type BatchNode struct {
query *Query
ticker ticker
wg sync.WaitGroup
errC chan error
}

func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error) {
bn := &BatchNode{
node: node{Node: n, et: et},
b: n,
errC: make(chan error, 1),
}
bn.node.runF = bn.runBatch
bn.node.stopF = bn.stopBatch
Expand Down Expand Up @@ -174,7 +176,9 @@ func (b *BatchNode) DBRPs() ([]DBRP, error) {

func (b *BatchNode) Start(batch BatchCollector) {
b.wg.Add(1)
go b.doQuery(batch)
go func() {
b.errC <- b.doQuery(batch)
}()
}

func (b *BatchNode) Queries(start, stop time.Time) []string {
Expand Down Expand Up @@ -203,10 +207,14 @@ func (b *BatchNode) Queries(start, stop time.Time) []string {
}

// Query InfluxDB return Edge with data
func (b *BatchNode) doQuery(batch BatchCollector) {
func (b *BatchNode) doQuery(batch BatchCollector) error {
defer batch.Close()
defer b.wg.Done()

if b.et.tm.InfluxDBService == nil {
return errors.New("InfluxDB not configured, cannot query InfluxDB for batch query")
}

tickC := b.ticker.Start()
for now := range tickC {

Expand All @@ -220,8 +228,7 @@ func (b *BatchNode) doQuery(batch BatchCollector) {
// Connect
con, err := b.et.tm.InfluxDBService.NewClient()
if err != nil {
b.logger.Println("E! " + err.Error())
break
return err
}
q := client.Query{
Command: b.query.String(),
Expand All @@ -230,27 +237,25 @@ func (b *BatchNode) doQuery(batch BatchCollector) {
// Execute query
resp, err := con.Query(q)
if err != nil {
b.logger.Println("E! " + err.Error())
return
return err
}

if resp.Err != nil {
b.logger.Println("E! " + resp.Err.Error())
return
return resp.Err
}

// Collect batches
for _, res := range resp.Results {
batches, err := models.ResultToBatches(res)
if err != nil {
b.logger.Println("E!", err)
return
return err
}
for _, bch := range batches {
batch.CollectBatch(bch)
}
}
}
return errors.New("batch ticker schedule stopped")
}

func (b *BatchNode) stopBatch() {
Expand All @@ -261,22 +266,32 @@ func (b *BatchNode) stopBatch() {
}

func (b *BatchNode) runBatch() error {
for bt, ok := b.ins[0].NextBatch(); ok; bt, ok = b.ins[0].NextBatch() {
for _, child := range b.outs {
err := child.CollectBatch(bt)
if err != nil {
return err
errC := make(chan error, 1)
go func() {
for bt, ok := b.ins[0].NextBatch(); ok; bt, ok = b.ins[0].NextBatch() {
for _, child := range b.outs {
err := child.CollectBatch(bt)
if err != nil {
errC <- err
return
}
}
}
errC <- nil
}()
// Wait for errors
select {
case err := <-b.errC:
return err
case err := <-errC:
return err
}
return nil
}

type ticker interface {
Start() <-chan time.Time
Stop()
// Return the next time the ticker will tick
// after now.
// Return the next time the ticker will tick after now.
Next(now time.Time) time.Time
}

Expand Down
58 changes: 40 additions & 18 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
LOG_DIR = "/var/log/kapacitor"
DATA_DIR = "/var/lib/kapacitor"
SCRIPT_DIR = "/usr/lib/kapacitor/scripts"
CONFIG_DIR = "/etc/kapacitor"
LOGROTATE_DIR = "/etc/logrotate.d"

INIT_SCRIPT = "scripts/init.sh"
SYSTEMD_SCRIPT = "scripts/kapacitor.service"
POSTINST_SCRIPT = "scripts/post-install.sh"
LOGROTATE_SCRIPT = "scripts/logrotate"
CONFIG_SAMPLE = "etc/config.sample.toml"
POSTUNINST_SCRIPT = "scripts/post-uninstall.sh"
LOGROTATE_CONFIG = "etc/logrotate.d/kapacitor"
DEFAULT_CONFIG = "etc/kapacitor/kapacitor.conf"
PREINST_SCRIPT = None

# META-PACKAGE VARIABLES
Expand All @@ -34,7 +33,33 @@
prereqs = [ 'git', 'go' ]
optional_prereqs = [ 'gvm', 'fpm', 'awscmd', 'rpmbuild' ]

fpm_common_args = "-f -s dir --log error --vendor {} --url {} --after-install {} --license {} --maintainer {} --config-files {} --config-files {} --description \"{}\"".format(VENDOR, PACKAGE_URL, POSTINST_SCRIPT, PACKAGE_LICENSE, MAINTAINER, CONFIG_DIR, LOGROTATE_DIR, DESCRIPTION)
fpm_common_args = "-f -s dir --log error \
--vendor {} \
--url {} \
--after-install {} \
--after-remove {} \
--license {} \
--maintainer {} \
--config-files {} \
--config-files {} \
--directories {} \
--description \"{}\"".format(
VENDOR,
PACKAGE_URL,
POSTINST_SCRIPT,
POSTUNINST_SCRIPT,
PACKAGE_LICENSE,
MAINTAINER,
DEFAULT_CONFIG,
LOGROTATE_CONFIG,
' --directories '.join([
LOG_DIR[1:],
DATA_DIR[1:],
SCRIPT_DIR[1:],
os.path.dirname(SCRIPT_DIR[1:]),
os.path.dirname(DEFAULT_CONFIG),
]),
DESCRIPTION)

targets = {
'kapacitor' : './cmd/kapacitor/main.go',
Expand Down Expand Up @@ -209,12 +234,9 @@ def create_dir(path):

def move_file(fr, to):
try:
os.rename(fr, to)
shutil.move(fr, to)
except OSError as e:
try:
shutil.copyfile(fr, to)
except OSError as e:
print e
print e

def create_package_fs(build_root):
print "\t- Creating a filesystem hierarchy from directory: {}".format(build_root)
Expand All @@ -224,15 +246,15 @@ def create_package_fs(build_root):
create_dir(os.path.join(build_root, LOG_DIR[1:]))
create_dir(os.path.join(build_root, DATA_DIR[1:]))
create_dir(os.path.join(build_root, SCRIPT_DIR[1:]))
create_dir(os.path.join(build_root, CONFIG_DIR[1:]))
create_dir(os.path.join(build_root, LOGROTATE_DIR[1:]))
create_dir(os.path.join(build_root, os.path.dirname(DEFAULT_CONFIG)))
create_dir(os.path.join(build_root, os.path.dirname(LOGROTATE_CONFIG)))

def package_scripts(build_root):
print "\t- Copying scripts and sample configuration to build directory"
shutil.copyfile(INIT_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], INIT_SCRIPT.split('/')[1]))
shutil.copyfile(SYSTEMD_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], SYSTEMD_SCRIPT.split('/')[1]))
shutil.copyfile(LOGROTATE_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], LOGROTATE_SCRIPT.split('/')[1]))
shutil.copyfile(CONFIG_SAMPLE, os.path.join(build_root, CONFIG_DIR[1:], CONFIG_SAMPLE.split('/')[1]))
print "\t- Copying scripts and configuration to build directory"
shutil.copy(INIT_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], INIT_SCRIPT.split('/')[1]))
shutil.copy(SYSTEMD_SCRIPT, os.path.join(build_root, SCRIPT_DIR[1:], SYSTEMD_SCRIPT.split('/')[1]))
shutil.copy(LOGROTATE_CONFIG, os.path.join(build_root, LOGROTATE_CONFIG))
shutil.copy(DEFAULT_CONFIG, os.path.join(build_root, DEFAULT_CONFIG))

def build_packages(build_output, version):
TMP_BUILD_DIR = create_temp_dir()
Expand All @@ -256,7 +278,7 @@ def build_packages(build_output, version):
name = 'kapacitor'
if package_type in ['zip', 'tar']:
name = '{}-{}'.format(name, version)
fpm_command = "fpm {} --name {} -t {} --version {} -C {}".format(fpm_common_args,
fpm_command = "fpm {} --name {} -t {} --version {} -C {} ".format(fpm_common_args,
name,
package_type,
version,
Expand Down
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ cd $DIR
set -e
# Run the build utility via Docker
docker build --tag=kapacitor-builder $DIR
echo "Running build.py"
docker run -v $DIR:/gopath/src/github.com/influxdb/kapacitor kapacitor-builder "$@"
32 changes: 17 additions & 15 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ type Server struct {

TaskMaster *kapacitor.TaskMaster

LogService *logging.Service
HTTPDService *httpd.Service
Streamer *streamer.Service
TaskStore *task_store.Service
ReplayService *replay.Service
InfluxDB *influxdb.Service
SMTPService *smtp.Service
LogService *logging.Service
HTTPDService *httpd.Service
Streamer *streamer.Service
TaskStore *task_store.Service
ReplayService *replay.Service
InfluxDBService *influxdb.Service
SMTPService *smtp.Service

MetaStore *metastore
QueryExecutor *queryexecutor
Expand Down Expand Up @@ -136,14 +136,16 @@ func (s *Server) appendSMTPService(c smtp.Config) {
}

func (s *Server) appendInfluxDBService(c influxdb.Config, hostname string) {
l := s.LogService.NewLogger("[influxdb] ", log.LstdFlags)
srv := influxdb.NewService(c, hostname, l)
srv.PointsWriter = s.Streamer
srv.LogService = s.LogService
if c.Enabled {
l := s.LogService.NewLogger("[influxdb] ", log.LstdFlags)
srv := influxdb.NewService(c, hostname, l)
srv.PointsWriter = s.Streamer
srv.LogService = s.LogService

s.InfluxDB = srv
s.TaskMaster.InfluxDBService = srv
s.Services = append(s.Services, srv)
s.InfluxDBService = srv
s.TaskMaster.InfluxDBService = srv
s.Services = append(s.Services, srv)
}
}

func (s *Server) appendHTTPDService(c httpd.Config) {
Expand Down Expand Up @@ -174,7 +176,7 @@ func (s *Server) appendReplayStoreService(c replay.Config) {
srv := replay.NewService(c, l)
srv.TaskStore = s.TaskStore
srv.HTTPDService = s.HTTPDService
srv.InfluxDBService = s.InfluxDB
srv.InfluxDBService = s.InfluxDBService
srv.TaskMaster = s.TaskMaster

s.ReplayService = srv
Expand Down
8 changes: 8 additions & 0 deletions etc/config.sample.toml → etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ reporting-disabled = false
https-enabled = false
https-certificate = "/etc/ssl/kapacitor.pem"

[logging]
# Destination for logs
# Can be a path to a file or STDOUT, STDERR
file = "/var/log/kapacitor/kapacitor.log"
# Logging level can be one of:
# DEBUG, INFO, WARN, ERROR, or OFF
level = "INFO"

[replay]
# Where to store replay files
dir = "/var/lib/kapacitor/replay"
Expand Down
File renamed without changes.
17 changes: 17 additions & 0 deletions scripts/post-uninstall.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/sh

rm -f /etc/default/kapacitor

# Systemd
if which systemctl > /dev/null 2>&1 ; then
systemctl disable kapacitor
rm -f /lib/systemd/system/kapacitor.service
# Sysv
else
if which update-rc.d > /dev/null 2>&1 ; then
update-rc.d -f kapacitor remove
else
chkconfig --del kapacitor
fi
rm -f /etc/init.d/kapacitor
fi
4 changes: 4 additions & 0 deletions services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package replay
import (
"archive/zip"
"compress/gzip"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -489,6 +490,9 @@ func (r *Service) doRecordBatch(rid uuid.UUID, t *kapacitor.Task, start, stop ti
return err
}

if r.InfluxDBService == nil {
return errors.New("InfluxDB not configured, cannot record batch query")
}
con, err := r.InfluxDBService.NewClient()
if err != nil {
return err
Expand Down

0 comments on commit 3d24457

Please sign in to comment.