Skip to content

Commit

Permalink
add 2-way SSL to secure Glow cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Feb 14, 2016
1 parent 9850f59 commit f00fa06
Show file tree
Hide file tree
Showing 25 changed files with 278 additions and 216 deletions.
29 changes: 17 additions & 12 deletions agent/agent_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

type AgentServerOption struct {
Master *string
Host *string
Port *int
Dir *string
DataCenter *string
Expand All @@ -40,7 +41,6 @@ type AgentServerOption struct {
type AgentServer struct {
Option *AgentServerOption
Master string
Port int
wg sync.WaitGroup
listener net.Listener
computeResource *resource.ComputeResource
Expand All @@ -61,7 +61,6 @@ func NewAgentServer(option *AgentServerOption) *AgentServer {
as := &AgentServer{
Option: option,
Master: *option.Master,
Port: *option.Port,
storageBackend: NewLocalDatasetShardsManager(*option.Dir, *option.Port),
computeResource: &resource.ComputeResource{
CPUCount: *option.MaxExecutor,
Expand All @@ -80,27 +79,23 @@ func NewAgentServer(option *AgentServerOption) *AgentServer {
return as
}

// Start starts to listen on a port, returning the listening port
// r.Port can be pre-set or leave it as zero
// The actual port set to r.Port
func (r *AgentServer) init() (err error) {
tlsConfig := r.Option.CertFiles.MakeTLSConfig()
if tlsConfig == nil {
r.listener, err = net.Listen("tcp", ":"+strconv.Itoa(r.Port))
r.listener, err = net.Listen("tcp", *r.Option.Host+":"+strconv.Itoa(*r.Option.Port))
} else {
r.listener, err = tls.Listen("tcp", ":"+strconv.Itoa(r.Port), tlsConfig)
r.listener, err = tls.Listen("tcp", *r.Option.Host+":"+strconv.Itoa(*r.Option.Port), tlsConfig)
}
util.SetupHttpClient(tlsConfig)
if err != nil {
log.Fatal(err)
}
util.SetupHttpClient(tlsConfig)

r.Port = r.listener.Addr().(*net.TCPAddr).Port
fmt.Println("AgentServer starts on:", r.Port)
fmt.Println("AgentServer starts on", *r.Option.Host+":"+strconv.Itoa(*r.Option.Port))

if *r.Option.CleanRestart {
if fileInfos, err := ioutil.ReadDir(r.storageBackend.dir); err == nil {
suffix := fmt.Sprintf("-%d.dat", r.Port)
suffix := fmt.Sprintf("-%d.dat", *r.Option.Port)
for _, fi := range fileInfos {
name := fi.Name()
if !fi.IsDir() && strings.HasSuffix(name, suffix) {
Expand All @@ -117,7 +112,7 @@ func (r *AgentServer) init() (err error) {
func (as *AgentServer) Run() {
//register agent
killHeartBeaterChan := make(chan bool, 1)
go client.NewHeartBeater(as.Port, as.Master).StartAgentHeartBeat(killHeartBeaterChan, func(values url.Values) {
go client.NewHeartBeater(*as.Option.Host, *as.Option.Port, as.Master).StartAgentHeartBeat(killHeartBeaterChan, func(values url.Values) {
resource.AddToValues(values, as.computeResource, as.allocatedResource)
values.Add("dataCenter", *as.Option.DataCenter)
values.Add("rack", *as.Option.Rack)
Expand Down Expand Up @@ -151,6 +146,16 @@ func (r *AgentServer) handleRequest(conn net.Conn) {
buf := make([]byte, 4)

f, message, err := util.ReadBytes(conn, buf)

tlscon, ok := conn.(*tls.Conn)
if ok {
state := tlscon.ConnectionState()
if !state.HandshakeComplete {
log.Printf("Failed to tls handshake with: %+v", tlscon.RemoteAddr())
return
}
}

if f != util.Data {
//strange if this happens
println("read", len(message.Bytes()), "request flag:", f, "data", string(message.Data()))
Expand Down
11 changes: 7 additions & 4 deletions agent/agent_server_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ func (as *AgentServer) handleCommandConnection(conn net.Conn,
switch command.GetType() {
case cmd.ControlMessage_StartRequest:
reply.Type = cmd.ControlMessage_StartResponse.Enum()
remoteAddress := conn.RemoteAddr().String()
// println("remote address is", remoteAddress)
host := remoteAddress[:strings.LastIndex(remoteAddress, ":")]
command.StartRequest.Host = &host
// println("start from", *command.StartRequest.Host)
if *command.StartRequest.Host == "" {
remoteAddress := conn.RemoteAddr().String()
// println("remote address is", remoteAddress)
host := remoteAddress[:strings.LastIndex(remoteAddress, ":")]
command.StartRequest.Host = &host
}
reply.StartResponse = as.handleStart(conn, command.StartRequest)
// return nil to avoid writing the response to the connection.
// Currently the connection is used for reading outputs
Expand Down
17 changes: 14 additions & 3 deletions agent/agent_server_executor_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/exec"
"path"
"strconv"
"strings"
"time"

"github.com/chrislusf/glow/driver/cmd"
Expand Down Expand Up @@ -68,9 +69,17 @@ func (as *AgentServer) handleStart(conn net.Conn,

func (as *AgentServer) adjustArgs(args []string, requestId uint32) (ret []string) {
if as.Option.CertFiles.IsEnabled() {
for i := 0; i < len(args); i++ {
ret = append(ret, args[i])
switch args[i] {
var cleanedArgs []string
for _, arg := range args {
if strings.Contains(arg, "=") {
cleanedArgs = append(cleanedArgs, strings.SplitN(arg, "=", 2)...)
} else {
cleanedArgs = append(cleanedArgs, arg)
}
}
for i := 0; i < len(cleanedArgs); i++ {
ret = append(ret, cleanedArgs[i])
switch cleanedArgs[i] {
case "-cert.file":
ret = append(ret, as.Option.CertFiles.CertFile)
i++
Expand All @@ -87,6 +96,8 @@ func (as *AgentServer) adjustArgs(args []string, requestId uint32) (ret []string
}
ret = append(ret, "-glow.request.id")
ret = append(ret, fmt.Sprintf("%d", requestId))

// fmt.Printf("cmd: %v\n", ret)
return
}

Expand Down
52 changes: 34 additions & 18 deletions driver/context_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ type DriverOption struct {
Module string
RelatedFiles string
ShowFlowStats bool
ListenOn string
Host string
Port int
CertFiles netchan.CertFiles
}

var driverOption DriverOption

func init() {
var driverOption DriverOption
flag.BoolVar(&driverOption.ShouldStart, "glow", false, "start in driver mode")
flag.StringVar(&driverOption.Leader, "glow.leader", "localhost:8930", "leader server")
flag.StringVar(&driverOption.DataCenter, "glow.dataCenter", "", "preferred data center name")
Expand All @@ -44,7 +46,8 @@ func init() {
flag.StringVar(&driverOption.Module, "glow.module", "", "a name to group related jobs together on agent")
flag.StringVar(&driverOption.RelatedFiles, "glow.related.files", "", strconv.QuoteRune(os.PathListSeparator)+" separated list of files")
flag.BoolVar(&driverOption.ShowFlowStats, "glow.flow.stat", false, "show flow details at the end of execution")
flag.StringVar(&driverOption.ListenOn, "glow.driver.listenOn", ":0", "listen on this address to copy itself and related files to agents")
flag.StringVar(&driverOption.Host, "glow.driver.host", "", "driver runs on this host address. Required in 2-way SSL mode.")
flag.IntVar(&driverOption.Port, "glow.driver.port", 0, "driver listens on this port to copy files to agents. Required to specify and open this port.")
flag.StringVar(&driverOption.CertFiles.CertFile, "cert.file", "", "A PEM eoncoded certificate file")
flag.StringVar(&driverOption.CertFiles.KeyFile, "key.file", "", "A PEM encoded private key file")
flag.StringVar(&driverOption.CertFiles.CaFile, "ca.file", "", "A PEM eoncoded CA's certificate file")
Expand All @@ -53,22 +56,30 @@ func init() {
}

type FlowContextDriver struct {
option *DriverOption
Option *DriverOption

stepGroups []*plan.StepGroup
taskGroups []*plan.TaskGroup
}

func NewFlowContextDriver(option *DriverOption) *FlowContextDriver {
return &FlowContextDriver{option: option}
return &FlowContextDriver{Option: option}
}

func (fcd *FlowContextDriver) IsDriverMode() bool {
return fcd.option.ShouldStart
return fcd.Option.ShouldStart
}

func (fcd *FlowContextDriver) IsDriverPlotMode() bool {
return fcd.option.PlotOutput
return fcd.Option.PlotOutput
}

func (fcd *FlowContextDriver) checkParameters() {
if fcd.Option.CertFiles.IsEnabled() {
if fcd.Option.Host == "" {
log.Fatalf("Usage Note: -glow.driver.host option is needed and must match CN in the certificate.")
}
}
}

func (fcd *FlowContextDriver) Plot(fc *flow.FlowContext) {
Expand All @@ -79,33 +90,38 @@ func (fcd *FlowContextDriver) Plot(fc *flow.FlowContext) {
// driver runs on local, controlling all tasks
func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {

fcd.checkParameters()

// task fusion to minimize disk IO
fcd.stepGroups, fcd.taskGroups = plan.GroupTasks(fc)
// plot the execution graph
if fcd.option.PlotOutput {
if fcd.Option.PlotOutput {
plan.PlotGraph(fcd.taskGroups, fc)
return
}

tlsConfig := fcd.option.CertFiles.MakeTLSConfig()
tlsConfig := fcd.Option.CertFiles.MakeTLSConfig()
util.SetupHttpClient(tlsConfig)

// start server to serve files to agents to run exectuors
rsyncServer, err := rsync.NewRsyncServer(fcd.option.ListenOn, os.Args[0], fcd.option.RelatedFileNames())
rsyncServer, err := rsync.NewRsyncServer(os.Args[0], fcd.Option.RelatedFileNames())
if err != nil {
log.Fatalf("Failed to start local server: %v", err)
}
rsyncServer.Start()
rsyncServer.StartRsyncServer(tlsConfig, fcd.Option.Host+":"+strconv.Itoa(fcd.Option.Port))

driverHost := fcd.Option.Host

// create thes cheduler
sched := scheduler.NewScheduler(
fcd.option.Leader,
fcd.Option.Leader,
&scheduler.SchedulerOption{
DataCenter: fcd.option.DataCenter,
Rack: fcd.option.Rack,
TaskMemoryMB: fcd.option.TaskMemoryMB,
DataCenter: fcd.Option.DataCenter,
Rack: fcd.Option.Rack,
TaskMemoryMB: fcd.Option.TaskMemoryMB,
DriverHost: driverHost,
DriverPort: rsyncServer.Port,
Module: fcd.option.Module,
Module: fcd.Option.Module,
ExecutableFile: os.Args[0],
ExecutableFileHash: rsyncServer.ExecutableFileHash(),
TlsConfig: tlsConfig,
Expand All @@ -131,7 +147,7 @@ func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {
sched.EventChan <- scheduler.SubmitTaskGroup{
FlowContext: fc,
TaskGroup: taskGroup,
Bid: fcd.option.FlowBid / float64(len(fcd.taskGroups)),
Bid: fcd.Option.FlowBid / float64(len(fcd.taskGroups)),
WaitGroup: &wg,
}
}
Expand All @@ -141,7 +157,7 @@ func (fcd *FlowContextDriver) Run(fc *flow.FlowContext) {

fcd.CloseOutputChannels(fc)

if fcd.option.ShowFlowStats {
if fcd.Option.ShowFlowStats {
fcd.ShowFlowStatus(fc, sched)
}

Expand Down
13 changes: 7 additions & 6 deletions driver/context_driver_on_interrupt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package driver

import (
"crypto/tls"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -40,7 +41,7 @@ func (fcd *FlowContextDriver) OnExit(
return
}
// println("checking", request.Allocation.Location.URL(), requestId)
if err := askExecutorToStopRequest(request.Allocation.Location.URL(), requestId); err != nil {
if err := askExecutorToStopRequest(sched.Option.TlsConfig, request.Allocation.Location.URL(), requestId); err != nil {
fmt.Printf("Error to stop request %d on %s: %v\n", request.Allocation.Location.URL(), requestId, err)
return
}
Expand Down Expand Up @@ -99,7 +100,7 @@ func (fcd *FlowContextDriver) collectStatusFromRemoteExecutors(sched *scheduler.
return
}
// println("checking", request.Allocation.Location.URL(), requestId)
stat, err := askExecutorStatusForRequest(request.Allocation.Location.URL(), requestId)
stat, err := askExecutorStatusForRequest(sched.Option.TlsConfig, request.Allocation.Location.URL(), requestId)
if err != nil {
fmt.Printf("Error to request status from %s: %v\n", request.Allocation.Location.URL(), err)
return
Expand All @@ -114,9 +115,9 @@ func (fcd *FlowContextDriver) collectStatusFromRemoteExecutors(sched *scheduler.
return stats
}

func askExecutorStatusForRequest(server string, requestId uint32) (*RemoteExecutorStatus, error) {
func askExecutorStatusForRequest(tlsConfig *tls.Config, server string, requestId uint32) (*RemoteExecutorStatus, error) {

reply, err := scheduler.RemoteDirectCommand(server, scheduler.NewGetStatusRequest(requestId))
reply, err := scheduler.RemoteDirectCommand(tlsConfig, server, scheduler.NewGetStatusRequest(requestId))
if err != nil {
return nil, err
}
Expand All @@ -134,7 +135,7 @@ func askExecutorStatusForRequest(server string, requestId uint32) (*RemoteExecut
}, nil
}

func askExecutorToStopRequest(server string, requestId uint32) (err error) {
_, err = scheduler.RemoteDirectCommand(server, scheduler.NewStopRequest(requestId))
func askExecutorToStopRequest(tlsConfig *tls.Config, server string, requestId uint32) (err error) {
_, err = scheduler.RemoteDirectCommand(tlsConfig, server, scheduler.NewStopRequest(requestId))
return
}
20 changes: 15 additions & 5 deletions driver/rsync/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package rsync

import (
"crypto/tls"
"encoding/hex"
"hash/crc32"
"io"
Expand All @@ -11,6 +12,7 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"time"

"github.com/chrislusf/glow/util"
Expand All @@ -23,6 +25,7 @@ type FileHash struct {
}

type RsyncServer struct {
Ip string
Port int
listenOn string
ExecutableFile string
Expand All @@ -31,9 +34,8 @@ type RsyncServer struct {
fileHashes []FileHash
}

func NewRsyncServer(listenOn string, file string, relatedFiles []string) (*RsyncServer, error) {
func NewRsyncServer(file string, relatedFiles []string) (*RsyncServer, error) {
rs := &RsyncServer{
listenOn: listenOn,
ExecutableFile: file,
RelatedFiles: relatedFiles,
}
Expand Down Expand Up @@ -85,17 +87,25 @@ func (rs *RsyncServer) fileHandler(w http.ResponseWriter, r *http.Request) {
}

// go start a http server locally that will respond predictably to ranged requests
func (rs *RsyncServer) Start() {
func (rs *RsyncServer) StartRsyncServer(tlsConfig *tls.Config, listenOn string) {
s := http.NewServeMux()
s.HandleFunc("/list", rs.listHandler)
s.HandleFunc("/file/", rs.fileHandler)

listener, err := net.Listen("tcp", rs.listenOn)
var listener net.Listener
var err error
if tlsConfig == nil {
listener, err = net.Listen("tcp", listenOn)
} else {
listener, err = tls.Listen("tcp", listenOn, tlsConfig)
}
if err != nil {
log.Fatal(err)
}

rs.Port = listener.Addr().(*net.TCPAddr).Port
addr := listener.Addr().(*net.TCPAddr)
rs.Ip = addr.String()[:strings.LastIndex(addr.String(), ":")]
rs.Port = addr.Port

go func() {
http.Serve(listener, s)
Expand Down
Loading

0 comments on commit f00fa06

Please sign in to comment.