Skip to content

Commit

Permalink
reorganize server into its own package (influxdata#699)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathaniel Cook authored Jul 5, 2016
1 parent bf51dbd commit c52281a
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 58 deletions.
15 changes: 8 additions & 7 deletions cmd/kapacitord/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"

"github.com/BurntSushi/toml"
"github.com/influxdata/kapacitor/server"
"github.com/influxdata/kapacitor/services/logging"
"github.com/influxdata/kapacitor/tick"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ type Command struct {
Stdout io.Writer
Stderr io.Writer

Server *Server
Server *server.Server
Logger *log.Logger
logService *logging.Service
}
Expand Down Expand Up @@ -108,16 +109,16 @@ func (cmd *Command) Run(args ...string) error {

// Mark start-up in log.,
cmd.Logger.Printf("I! Kapacitor starting, version %s, branch %s, commit %s", cmd.Version, cmd.Branch, cmd.Commit)
cmd.Logger.Printf("I! Go version %s, GOMAXPROCS set to %d", runtime.Version(), runtime.GOMAXPROCS(0))
cmd.Logger.Printf("I! Go version %s", runtime.Version())

// Write the PID file.
if err := cmd.writePIDFile(options.PIDFile); err != nil {
return fmt.Errorf("write pid file: %s", err)
}

// Create server from config and start it.
buildInfo := &BuildInfo{Version: cmd.Version, Commit: cmd.Commit, Branch: cmd.Branch}
s, err := NewServer(config, buildInfo, cmd.logService)
buildInfo := server.BuildInfo{Version: cmd.Version, Commit: cmd.Commit, Branch: cmd.Branch}
s, err := server.New(config, buildInfo, cmd.logService)
if err != nil {
return fmt.Errorf("create server: %s", err)
}
Expand Down Expand Up @@ -202,16 +203,16 @@ func (cmd *Command) writePIDFile(path string) error {

// ParseConfig parses the config at path.
// Returns a demo configuration if path is blank.
func (cmd *Command) ParseConfig(path string) (*Config, error) {
func (cmd *Command) ParseConfig(path string) (*server.Config, error) {
// Use demo configuration if no config path is specified.
if path == "" {
log.Println("no configuration provided, using default settings")
return NewDemoConfig()
return server.NewDemoConfig()
}

log.Printf("Using configuration at: %s\n", path)

config := NewConfig()
config := server.NewConfig()
if _, err := toml.DecodeFile(path, &config); err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/kapacitord/run/config_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"

"github.com/BurntSushi/toml"
"github.com/influxdata/kapacitor/server"
)

// PrintConfigCommand represents the command executed by "kapacitord config".
Expand Down Expand Up @@ -65,12 +66,12 @@ func (cmd *PrintConfigCommand) Run(args ...string) error {

// ParseConfig parses the config at path.
// Returns a demo configuration if path is blank.
func (cmd *PrintConfigCommand) parseConfig(path string) (*Config, error) {
func (cmd *PrintConfigCommand) parseConfig(path string) (*server.Config, error) {
if path == "" {
return NewDemoConfig()
return server.NewDemoConfig()
}

config := NewConfig()
config := server.NewConfig()
if _, err := toml.DecodeFile(path, &config); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kapacitord/run/config.go → server/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package run
package server

import (
"errors"
Expand Down
8 changes: 4 additions & 4 deletions cmd/kapacitord/run/config_test.go → server/config_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package run_test
package server_test

import (
"os"
"testing"

"github.com/BurntSushi/toml"
"github.com/influxdata/kapacitor/cmd/kapacitord/run"
"github.com/influxdata/kapacitor/server"
)

// Ensure the configuration can be parsed.
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c run.Config
var c server.Config
if _, err := toml.Decode(`
[replay]
dir = "/tmp/replay"
Expand All @@ -33,7 +33,7 @@ boltdb = "/tmp/kapacitor.db"
// Ensure the configuration can be parsed.
func TestConfig_Parse_EnvOverride(t *testing.T) {
// Parse configuration.
var c run.Config
var c server.Config
if _, err := toml.Decode(`
[[influxdb]]
urls=["http://localhost:8086"]
Expand Down
19 changes: 10 additions & 9 deletions cmd/kapacitord/run/server.go → server/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package run
// Provides a server type for starting and configuring a Kapacitor server.
package server

import (
"errors"
Expand Down Expand Up @@ -73,7 +74,7 @@ type Server struct {
InfluxDBService *influxdb.Service

MetaClient *kapacitor.NoopMetaClient
QueryExecutor *queryexecutor
QueryExecutor *Queryexecutor

Services []Service

Expand All @@ -87,21 +88,21 @@ type Server struct {
Logger *log.Logger
}

// NewServer returns a new instance of Server built from a config.
func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*Server, error) {
// New returns a new instance of Server built from a config.
func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server, error) {
err := c.Validate()
if err != nil {
return nil, fmt.Errorf("%s. To generate a valid configuration file run `kapacitord config > kapacitor.generated.conf`.", err)
}
l := logService.NewLogger("[srv] ", log.LstdFlags)
s := &Server{
buildInfo: *buildInfo,
buildInfo: buildInfo,
dataDir: c.DataDir,
hostname: c.Hostname,
err: make(chan error),
LogService: logService,
MetaClient: &kapacitor.NoopMetaClient{},
QueryExecutor: &queryexecutor{},
QueryExecutor: &Queryexecutor{},
Logger: l,
}
s.Logger.Println("I! Kapacitor hostname:", s.hostname)
Expand Down Expand Up @@ -598,11 +599,11 @@ type tcpaddr struct{ host string }
func (a *tcpaddr) Network() string { return "tcp" }
func (a *tcpaddr) String() string { return a.host }

type queryexecutor struct{}
type Queryexecutor struct{}

func (qe *queryexecutor) Authorize(u *meta.UserInfo, q *influxql.Query, db string) error {
func (qe *Queryexecutor) Authorize(u *meta.UserInfo, q *influxql.Query, db string) error {
return nil
}
func (qe *queryexecutor) ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
func (qe *Queryexecutor) ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
return nil, errors.New("cannot execute queries against Kapacitor")
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// This package is a set of convenience helpers and structs to make integration testing easier
package run_test
package server_test

import (
"encoding/json"
Expand All @@ -17,27 +17,27 @@ import (

iclient "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/kapacitor/client/v1"
"github.com/influxdata/kapacitor/cmd/kapacitord/run"
"github.com/influxdata/kapacitor/server"
"github.com/influxdata/wlog"
)

// Server represents a test wrapper for run.Server.
// Server represents a test wrapper for server.Server.
type Server struct {
*run.Server
Config *run.Config
*server.Server
Config *server.Config
}

// NewServer returns a new instance of Server.
func NewServer(c *run.Config) *Server {
func NewServer(c *server.Config) *Server {
configureLogging()
buildInfo := &run.BuildInfo{
buildInfo := server.BuildInfo{
Version: "testServer",
Commit: "testCommit",
Branch: "testBranch",
}
c.HTTP.LogEnabled = testing.Verbose()
ls := &LogService{}
srv, err := run.NewServer(c, buildInfo, ls)
srv, err := server.New(c, buildInfo, ls)
if err != nil {
panic(err)
}
Expand All @@ -57,7 +57,7 @@ func OpenDefaultServer() (*Server, *client.Client) {
}

// OpenServer opens a test server.
func OpenServer(c *run.Config) *Server {
func OpenServer(c *server.Config) *Server {
s := NewServer(c)
if err := s.Open(); err != nil {
panic(err.Error())
Expand Down Expand Up @@ -171,8 +171,8 @@ func (s *Server) Stats() (stats, error) {
}

// NewConfig returns the default config with temporary paths.
func NewConfig() *run.Config {
c := run.NewConfig()
func NewConfig() *server.Config {
c := server.NewConfig()
c.PostInit()
c.Reporting.Enabled = false
c.Replay.Dir = MustTempFile()
Expand Down
35 changes: 12 additions & 23 deletions cmd/kapacitord/run/server_test.go → server/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package run_test
package server_test

import (
"bufio"
Expand All @@ -23,10 +23,17 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/toml"
"github.com/influxdata/kapacitor/client/v1"
"github.com/influxdata/kapacitor/cmd/kapacitord/run"
"github.com/influxdata/kapacitor/server"
"github.com/influxdata/kapacitor/services/udf"
)

var udfDir string

func init() {
dir, _ := os.Getwd()
udfDir = filepath.Clean(filepath.Join(dir, "../udf"))
}

func TestServer_Ping(t *testing.T) {
s, cli := OpenDefaultServer()
t.Log(s.URL())
Expand Down Expand Up @@ -3502,12 +3509,6 @@ func TestServer_ReplayQuery(t *testing.T) {
// If this test fails due to missing python dependencies, run 'INSTALL_PREFIX=/usr/local ./install-deps.sh' from the root directory of the
// kapacitor project.
func TestServer_UDFStreamAgents(t *testing.T) {
dir, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
udfDir := filepath.Clean(filepath.Join(dir, "../../../udf"))

tdir, err := ioutil.TempDir("", "kapacitor_server_test")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3573,7 +3574,7 @@ func TestServer_UDFStreamAgents(t *testing.T) {
}
}

func testStreamAgent(t *testing.T, c *run.Config) {
func testStreamAgent(t *testing.T, c *server.Config) {
s := NewServer(c)
err := s.Open()
if err != nil {
Expand Down Expand Up @@ -3669,12 +3670,6 @@ test,group=b value=0 0000000011
// If this test fails due to missing python dependencies, run 'INSTALL_PREFIX=/usr/local ./install-deps.sh' from the root directory of the
// kapacitor project.
func TestServer_UDFStreamAgentsSocket(t *testing.T) {
dir, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
udfDir := filepath.Clean(filepath.Join(dir, "../../../udf"))

tdir, err := ioutil.TempDir("", "kapacitor_server_test")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3755,7 +3750,7 @@ func TestServer_UDFStreamAgentsSocket(t *testing.T) {
}
}

func testStreamAgentSocket(t *testing.T, c *run.Config) {
func testStreamAgentSocket(t *testing.T, c *server.Config) {
s := NewServer(c)
err := s.Open()
if err != nil {
Expand Down Expand Up @@ -3836,12 +3831,6 @@ test,group=a value=0 0000000011
// If this test fails due to missing python dependencies, run 'INSTALL_PREFIX=/usr/local ./install-deps.sh' from the root directory of the
// kapacitor project.
func TestServer_UDFBatchAgents(t *testing.T) {
dir, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
udfDir := filepath.Clean(filepath.Join(dir, "../../../udf"))

tdir, err := ioutil.TempDir("", "kapacitor_server_test")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3907,7 +3896,7 @@ func TestServer_UDFBatchAgents(t *testing.T) {
}
}

func testBatchAgent(t *testing.T, c *run.Config) {
func testBatchAgent(t *testing.T, c *server.Config) {
count := 0
stopTimeC := make(chan time.Time, 2)
db := NewInfluxDB(func(q string) *iclient.Response {
Expand Down

0 comments on commit c52281a

Please sign in to comment.