Skip to content

Commit

Permalink
Move apiserver to a package
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Nov 28, 2015
1 parent a0ab11c commit 929ff89
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 248 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ test:
go test -v -timeout 3m github.com/mailgun/kafka-pixy/consumer -check.vv
go test -v -timeout 3m github.com/mailgun/kafka-pixy/logging -check.vv
go test -v -timeout 3m github.com/mailgun/kafka-pixy/prettyfmt -check.vv
go test -v -timeout 3m github.com/mailgun/kafka-pixy/pixy -check.vv
go test -v -timeout 3m github.com/mailgun/kafka-pixy/producer -check.vv
go test -v -timeout 3m github.com/mailgun/kafka-pixy/service -check.vv

rebuild:
go clean -i
Expand Down
84 changes: 42 additions & 42 deletions pixy/httpapi.go → apiserver/apiserver.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pixy
package apiserver

import (
"encoding/json"
Expand All @@ -24,21 +24,21 @@ const (
NetworkUnix = "unix"

// HTTP headers used by the API.
HeaderContentLength = "Content-Length"
HeaderContentType = "Content-Type"
headerContentLength = "Content-Length"
headerContentType = "Content-Type"

// HTTP request parameters.
ParamTopic = "topic"
ParamKey = "key"
ParamSync = "sync"
ParamGroup = "group"
paramTopic = "topic"
paramKey = "key"
paramSync = "sync"
paramGroup = "group"
)

var (
EmptyResponse = map[string]interface{}{}
)

type HTTPAPIServer struct {
type T struct {
addr string
listener net.Listener
httpServer *manners.GracefulServer
Expand All @@ -48,10 +48,10 @@ type HTTPAPIServer struct {
errorCh chan error
}

// NewHTTPAPIServer creates an HTTP server instance that will accept API
// requests at the specified `network`/`address` and execute them with the
// specified `producer`, `consumer`, or `admin`, depending on the request type.
func NewHTTPAPIServer(network, addr string, producer *producer.T, consumer *consumer.T, admin *admin.T) (*HTTPAPIServer, error) {
// New creates an HTTP server instance that will accept API requests at the
// specified `network`/`address` and execute them with the specified `producer`,
// `consumer`, or `admin`, depending on the request type.
func New(network, addr string, producer *producer.T, consumer *consumer.T, admin *admin.T) (*T, error) {
// Start listening on the specified network/address.
listener, err := net.Listen(network, addr)
if err != nil {
Expand All @@ -66,7 +66,7 @@ func NewHTTPAPIServer(network, addr string, producer *producer.T, consumer *cons
// Create a graceful HTTP server instance.
router := mux.NewRouter()
httpServer := manners.NewWithServer(&http.Server{Handler: router})
as := &HTTPAPIServer{
as := &T{
addr: addr,
listener: manners.NewListener(listener),
httpServer: httpServer,
Expand All @@ -76,22 +76,22 @@ func NewHTTPAPIServer(network, addr string, producer *producer.T, consumer *cons
errorCh: make(chan error, 1),
}
// Configure the API request handlers.
router.HandleFunc(fmt.Sprintf("/topics/{%s}/messages", ParamTopic),
router.HandleFunc(fmt.Sprintf("/topics/{%s}/messages", paramTopic),
as.handleProduce).Methods("POST")
router.HandleFunc(fmt.Sprintf("/topics/{%s}/messages", ParamTopic),
router.HandleFunc(fmt.Sprintf("/topics/{%s}/messages", paramTopic),
as.handleConsume).Methods("GET")
router.HandleFunc(fmt.Sprintf("/topics/{%s}/offsets", ParamTopic),
router.HandleFunc(fmt.Sprintf("/topics/{%s}/offsets", paramTopic),
as.handleGetOffsets).Methods("GET")
router.HandleFunc(fmt.Sprintf("/topics/{%s}/offsets", ParamTopic),
router.HandleFunc(fmt.Sprintf("/topics/{%s}/offsets", paramTopic),
as.handleSetOffsets).Methods("POST")
router.HandleFunc(fmt.Sprintf("/topics/{%s}/consumers", ParamTopic),
router.HandleFunc(fmt.Sprintf("/topics/{%s}/consumers", paramTopic),
as.handleGetTopicConsumers).Methods("GET")
return as, nil
}

// Starts triggers asynchronous HTTP server start. If it fails then the error
// will be sent down to `HTTPAPIServer.ErrorCh()`.
func (as *HTTPAPIServer) Start() {
func (as *T) Start() {
go func() {
hid := sarama.RootCID.NewChild(fmt.Sprintf("API@%s", as.addr))
defer hid.LogScope()()
Expand All @@ -105,35 +105,35 @@ func (as *HTTPAPIServer) Start() {
// ErrorCh returns an output channel that HTTP server running in another
// goroutine will use if it stops with error if one occurs. The channel will be
// closed when the server is fully stopped due to an error or otherwise..
func (as *HTTPAPIServer) ErrorCh() <-chan error {
func (as *T) ErrorCh() <-chan error {
return as.errorCh
}

// AsyncStop triggers HTTP API listener stop. If a caller wants to know when
// the server terminates it should read from the `Error()` channel that will be
// closed upon server termination.
func (as *HTTPAPIServer) AsyncStop() {
func (as *T) AsyncStop() {
as.httpServer.Close()
}

// handleProduce is an HTTP request handler for `POST /topic/{topic}/messages`
func (as *HTTPAPIServer) handleProduce(w http.ResponseWriter, r *http.Request) {
func (as *T) handleProduce(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

topic := mux.Vars(r)[ParamTopic]
key := getParamBytes(r, ParamKey)
_, isSync := r.Form[ParamSync]
topic := mux.Vars(r)[paramTopic]
key := getParamBytes(r, paramKey)
_, isSync := r.Form[paramSync]

// Get the message body from the HTTP request.
if _, ok := r.Header[HeaderContentLength]; !ok {
errorText := fmt.Sprintf("Missing %s header", HeaderContentLength)
if _, ok := r.Header[headerContentLength]; !ok {
errorText := fmt.Sprintf("Missing %s header", headerContentLength)
respondWithJSON(w, http.StatusBadRequest, errorHTTPResponse{errorText})
return
}
messageSizeStr := r.Header.Get(HeaderContentLength)
messageSizeStr := r.Header.Get(headerContentLength)
messageSize, err := strconv.Atoi(messageSizeStr)
if err != nil {
errorText := fmt.Sprintf("Invalid %s header: %s", HeaderContentLength, messageSizeStr)
errorText := fmt.Sprintf("Invalid %s header: %s", headerContentLength, messageSizeStr)
respondWithJSON(w, http.StatusBadRequest, errorHTTPResponse{errorText})
return
}
Expand All @@ -145,7 +145,7 @@ func (as *HTTPAPIServer) handleProduce(w http.ResponseWriter, r *http.Request) {
}
if len(message) != messageSize {
errorText := fmt.Sprintf("Message size does not match %s: expected=%v, actual=%v",
HeaderContentLength, messageSize, len(message))
headerContentLength, messageSize, len(message))
respondWithJSON(w, http.StatusBadRequest, errorHTTPResponse{errorText})
return
}
Expand Down Expand Up @@ -177,10 +177,10 @@ func (as *HTTPAPIServer) handleProduce(w http.ResponseWriter, r *http.Request) {
}

// handleConsume is an HTTP request handler for `GET /topic/{topic}/messages`
func (as *HTTPAPIServer) handleConsume(w http.ResponseWriter, r *http.Request) {
func (as *T) handleConsume(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

topic := mux.Vars(r)[ParamTopic]
topic := mux.Vars(r)[paramTopic]
group, err := getGroupParam(r)
if err != nil {
respondWithJSON(w, http.StatusBadRequest, errorHTTPResponse{err.Error()})
Expand Down Expand Up @@ -211,10 +211,10 @@ func (as *HTTPAPIServer) handleConsume(w http.ResponseWriter, r *http.Request) {
}

// handleGetOffsets is an HTTP request handler for `GET /topic/{topic}/offsets`
func (as *HTTPAPIServer) handleGetOffsets(w http.ResponseWriter, r *http.Request) {
func (as *T) handleGetOffsets(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

topic := mux.Vars(r)[ParamTopic]
topic := mux.Vars(r)[paramTopic]
group, err := getGroupParam(r)
if err != nil {
respondWithJSON(w, http.StatusBadRequest, errorHTTPResponse{err.Error()})
Expand Down Expand Up @@ -251,10 +251,10 @@ func (as *HTTPAPIServer) handleGetOffsets(w http.ResponseWriter, r *http.Request
}

// handleGetOffsets is an HTTP request handler for `POST /topic/{topic}/offsets`
func (as *HTTPAPIServer) handleSetOffsets(w http.ResponseWriter, r *http.Request) {
func (as *T) handleSetOffsets(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

topic := mux.Vars(r)[ParamTopic]
topic := mux.Vars(r)[paramTopic]
group, err := getGroupParam(r)
if err != nil {
respondWithJSON(w, http.StatusBadRequest, errorHTTPResponse{err.Error()})
Expand Down Expand Up @@ -296,15 +296,15 @@ func (as *HTTPAPIServer) handleSetOffsets(w http.ResponseWriter, r *http.Request
}

// handleGetTopicConsumers is an HTTP request handler for `GET /topic/{topic}/consumers`
func (as *HTTPAPIServer) handleGetTopicConsumers(w http.ResponseWriter, r *http.Request) {
func (as *T) handleGetTopicConsumers(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
var err error

topic := mux.Vars(r)[ParamTopic]
topic := mux.Vars(r)[paramTopic]

group := ""
r.ParseForm()
groups := r.Form[ParamGroup]
groups := r.Form[paramGroup]
if len(groups) > 1 {
err = fmt.Errorf("One consumer group is expected, but %d provided", len(groups))
respondWithJSON(w, http.StatusBadRequest, errorHTTPResponse{err.Error()})
Expand Down Expand Up @@ -345,7 +345,7 @@ func (as *HTTPAPIServer) handleGetTopicConsumers(w http.ResponseWriter, r *http.
}
encodedRes = prettyfmt.CollapseJSON(encodedRes)

w.Header().Add(HeaderContentType, "application/json")
w.Header().Add(headerContentType, "application/json")
w.WriteHeader(http.StatusOK)
if _, err := w.Write(encodedRes); err != nil {
log.Errorf("Failed to send HTTP reponse: status=%d, body=%v, reason=%v", http.StatusOK, encodedRes, err)
Expand Down Expand Up @@ -400,7 +400,7 @@ func respondWithJSON(w http.ResponseWriter, status int, body interface{}) {
return
}

w.Header().Add(HeaderContentType, "application/json")
w.Header().Add(headerContentType, "application/json")
w.WriteHeader(status)
if _, err := w.Write(encodedRes); err != nil {
log.Errorf("Failed to send HTTP reponse: status=%d, body=%v, reason=%v", status, body, err)
Expand All @@ -409,7 +409,7 @@ func respondWithJSON(w http.ResponseWriter, status int, body interface{}) {

func getGroupParam(r *http.Request) (string, error) {
r.ParseForm()
groups := r.Form[ParamGroup]
groups := r.Form[paramGroup]
if len(groups) != 1 {
return "", fmt.Errorf("One consumer group is expected, but %d provided", len(groups))
}
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/mailgun/kafka-pixy/Godeps/_workspace/src/github.com/mailgun/log"
"github.com/mailgun/kafka-pixy/config"
"github.com/mailgun/kafka-pixy/logging"
"github.com/mailgun/kafka-pixy/pixy"
"github.com/mailgun/kafka-pixy/service"
)

const (
Expand Down Expand Up @@ -79,7 +79,7 @@ func main() {
}

log.Infof("Starting with config: %+v", cfg)
svc, err := pixy.SpawnService(cfg)
svc, err := service.Spawn(cfg)
if err != nil {
log.Errorf("Failed to start service: err=(%s)", err)
os.Exit(1)
Expand Down
124 changes: 0 additions & 124 deletions pixy/pixy_test.go

This file was deleted.

Loading

0 comments on commit 929ff89

Please sign in to comment.