Skip to content

Commit

Permalink
Support Management API and flow statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Fu committed Jan 28, 2017
1 parent 5c6f652 commit 2d0b6a3
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 9 deletions.
157 changes: 150 additions & 7 deletions cmd/shadowsocks-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"flag"
"fmt"
Expand Down Expand Up @@ -38,6 +39,7 @@ const (

var debug ss.DebugLog
var udp bool
var managerAddr string

func getRequest(conn *ss.Conn, auth bool) (host string, ota bool, err error) {
ss.SetReadTimeout(conn)
Expand Down Expand Up @@ -108,7 +110,7 @@ const logCntDelta = 100
var connCnt int
var nextLogConnCnt int = logCntDelta

func handleConnection(conn *ss.Conn, auth bool) {
func handleConnection(conn *ss.Conn, auth bool, port string) {
var host string

connCnt++ // this maybe not accurate, but should be enough
Expand Down Expand Up @@ -169,11 +171,19 @@ func handleConnection(conn *ss.Conn, auth bool) {
debug.Printf("piping %s<->%s ota=%v connOta=%v", conn.RemoteAddr(), host, ota, conn.IsOta())
}
if ota {
go ss.PipeThenCloseOta(conn, remote)
go func() {
flow := ss.PipeThenCloseOta(conn, remote)
passwdManager.addFlow(port, flow)
}()
} else {
go ss.PipeThenClose(conn, remote)
}
ss.PipeThenClose(remote, conn)
go func() {
flow := ss.PipeThenClose(conn, remote)
passwdManager.addFlow(port, flow)
}()
}
flow := ss.PipeThenClose(remote, conn)
passwdManager.addFlow(port, flow)

closed = true
return
}
Expand All @@ -192,11 +202,13 @@ type PasswdManager struct {
sync.Mutex
portListener map[string]*PortListener
udpListener map[string]*UDPListener
flowStats map[string]int64
}

func (pm *PasswdManager) add(port, password string, listener net.Listener) {
pm.Lock()
pm.portListener[port] = &PortListener{password, listener}
pm.flowStats[port] = 0
pm.Unlock()
}

Expand Down Expand Up @@ -235,12 +247,30 @@ func (pm *PasswdManager) del(port string) {
pl.listener.Close()
pm.Lock()
delete(pm.portListener, port)
delete(pm.flowStats, port)
if udp {
delete(pm.udpListener, port)
}
pm.Unlock()
}

func (pm *PasswdManager) addFlow(port string, n int) {
pm.Lock()
pm.flowStats[port] = pm.flowStats[port] + int64(n)
pm.Unlock()
return
}

func (pm *PasswdManager) getFlowStats() map[string]int64 {
pm.Lock()
copy := make(map[string]int64)
for k, v := range pm.flowStats {
copy[k] = v
}
pm.Unlock()
return copy
}

// Update port password would first close a port and restart listening on that
// port. A different approach would be directly change the password used by
// that port, but that requires **sharing** password between the port listener
Expand All @@ -266,7 +296,11 @@ func (pm *PasswdManager) updatePortPasswd(port, password string, auth bool) {
}
}

var passwdManager = PasswdManager{portListener: map[string]*PortListener{}, udpListener: map[string]*UDPListener{}}
var passwdManager = PasswdManager{
portListener: map[string]*PortListener{},
udpListener: map[string]*UDPListener{},
flowStats: map[string]int64{},
}

func updatePasswd() {
log.Println("updating password")
Expand Down Expand Up @@ -335,7 +369,7 @@ func run(port, password string, auth bool) {
continue
}
}
go handleConnection(ss.NewConn(conn, cipher.Copy()), auth)
go handleConnection(ss.NewConn(conn, cipher.Copy()), auth, port)
}
}

Expand Down Expand Up @@ -405,6 +439,7 @@ func main() {
flag.IntVar(&core, "core", 0, "maximum number of CPU cores to use, default is determinied by Go runtime")
flag.BoolVar((*bool)(&debug), "d", false, "print debug message")
flag.BoolVar(&udp, "u", false, "UDP Relay")
flag.StringVar(&managerAddr, "manager-address", "", "shadowsocks manager listening address")
flag.Parse()

if printVer {
Expand Down Expand Up @@ -451,5 +486,113 @@ func main() {
}
}

if managerAddr != "" {
addr, err := net.ResolveUDPAddr("udp", managerAddr)
if err != nil {
fmt.Fprintln(os.Stderr, "Can't resolve address: ", err)
os.Exit(1)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
fmt.Fprintln(os.Stderr, "Error listening:", err)
os.Exit(1)
}
log.Printf("manager listening udp addr %v ...\n", managerAddr)
defer conn.Close()
go managerDaemon(conn)
}

waitSignal()
}

func managerDaemon(conn *net.UDPConn) {
for {
data := make([]byte, 300)
_, remote, err := conn.ReadFromUDP(data)
if err != nil {
fmt.Fprintln(os.Stderr, "Failed to read UDP manage msg, error: ", err.Error())
continue
}
command := string(data)
var res []byte
switch {
case strings.HasPrefix(command, "add:"):
res = handleAddPort(bytes.Trim(data[4:], "\x00\r\n "))
case strings.HasPrefix(command, "remove:"):
res = handleRemovePort(bytes.Trim(data[7:], "\x00\r\n "))
case strings.HasPrefix(command, "ping"):
res = handlePing()
}
if len(res) == 0 {
continue
}
_, err = conn.WriteToUDP(res, remote)
if err != nil {
fmt.Fprintln(os.Stderr, "Failed to write UDP manage msg, error: ", err.Error())
continue
}
}
}

func handleAddPort(payload []byte) []byte {
var params struct {
ServerPort interface{} `json:"server_port"` // may be string or int
Password string `json:"password"`
}
json.Unmarshal(payload, &params)
if params.ServerPort == nil || params.Password == "" {
fmt.Fprintln(os.Stderr, "Failed to parse add req: ", string(payload))
return []byte("err")
}
port := parsePortNum(params.ServerPort)
if port == "" {
return []byte("err")
}
passwdManager.updatePortPasswd(port, params.Password, config.Auth)
return []byte("ok")
}

func handleRemovePort(payload []byte) []byte {
var params struct {
ServerPort interface{} `json:"server_port"` // may be string or int
}
json.Unmarshal(payload, &params)
if params.ServerPort == nil {
fmt.Fprintln(os.Stderr, "Failed to parse remove req: ", string(payload))
return []byte("err")
}
port := parsePortNum(params.ServerPort)
if port == "" {
return []byte("err")
}
log.Printf("closing port %s\n", port)
passwdManager.del(port)
return []byte("ok")
}

func handlePing() []byte {
stats := passwdManager.getFlowStats()
var buf bytes.Buffer
buf.WriteString("stat: ")
ret, _ := json.Marshal(stats)
buf.Write(ret)
return buf.Bytes()
}

func parsePortNum(in interface{}) string {
var port string
switch in.(type) {
case string:
// try to convert to number then convert back, to ensure valid value
portNum, err := strconv.Atoi(in.(string))
if portNum == 0 || err != nil {
return ""
}
port = strconv.Itoa(portNum)
case float64:
port = strconv.Itoa(int(in.(float64)))
default:
return ""
}
return port
}
10 changes: 8 additions & 2 deletions shadowsocks/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ func SetReadTimeout(c net.Conn) {
}

// PipeThenClose copies data from src to dst, closes dst when done.
func PipeThenClose(src, dst net.Conn) {
func PipeThenClose(src, dst net.Conn) int {
defer dst.Close()
buf := leakyBuf.Get()
defer leakyBuf.Put(buf)
var flow int
for {
SetReadTimeout(src)
n, err := src.Read(buf)
flow += n
// read may return EOF with n > 0
// should always process n > 0 bytes before handling error
if n > 0 {
Expand All @@ -43,10 +45,11 @@ func PipeThenClose(src, dst net.Conn) {
break
}
}
return flow
}

// PipeThenClose copies data from src to dst, closes dst when done, with ota verification.
func PipeThenCloseOta(src *Conn, dst net.Conn) {
func PipeThenCloseOta(src *Conn, dst net.Conn) int {
const (
dataLenLen = 2
hmacSha1Len = 10
Expand All @@ -59,6 +62,7 @@ func PipeThenCloseOta(src *Conn, dst net.Conn) {
// sometimes it have to fill large block
buf := leakyBuf.Get()
defer leakyBuf.Put(buf)
var flow int
for i := 1; ; i += 1 {
SetReadTimeout(src)
if n, err := io.ReadFull(src, buf[:dataLenLen+hmacSha1Len]); err != nil {
Expand All @@ -84,6 +88,7 @@ func PipeThenCloseOta(src *Conn, dst net.Conn) {
Debug.Printf("conn=%p #%v read data error n=%v: %v", src, i, n, err)
break
}
flow += int(dataLen)
chunkIdBytes := make([]byte, 4)
chunkId := src.GetAndIncrChunkId()
binary.BigEndian.PutUint32(chunkIdBytes, chunkId)
Expand All @@ -97,4 +102,5 @@ func PipeThenCloseOta(src *Conn, dst net.Conn) {
break
}
}
return flow
}

0 comments on commit 2d0b6a3

Please sign in to comment.