Skip to content

Commit

Permalink
fftw: support rate params to limit bandwidth
Browse files Browse the repository at this point in the history
  • Loading branch information
fatedier committed Mar 19, 2019
1 parent 65ea283 commit d3483b6
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 9 deletions.
1 change: 1 addition & 0 deletions cmd/fftw/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&options.ServerAddr, "server_addr", "s", version.DefaultServerAddr(), "remote fft server address")
rootCmd.PersistentFlags().StringVarP(&options.BindAddr, "bind_addr", "b", "0.0.0.0:7778", "bind address")
rootCmd.PersistentFlags().StringVarP(&options.AdvicePublicIP, "advice_public_ip", "p", "", "fft worker's advice public ip")
rootCmd.PersistentFlags().IntVarP(&options.RateKB, "rate", "", 2048, "max bandwidth fftw will provide, unit is KB, default is 2048KB and min value is 50KB")

rootCmd.PersistentFlags().StringVarP(&options.LogFile, "log_file", "", "console", "log file path")
rootCmd.PersistentFlags().StringVarP(&options.LogLevel, "log_level", "", "info", "log level")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ require (
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
gopkg.in/cheggaaa/pb.v1 v1.0.28 h1:n1tBJnnK2r7g9OW2btFH91V92STTUevLXYFb8gy9EMk=
gopkg.in/cheggaaa/pb.v1 v1.0.28/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
33 changes: 33 additions & 0 deletions pkg/io/limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io

import (
"context"
"io"

"golang.org/x/time/rate"
)

type RateReader struct {
underlying io.Reader
limiter *rate.Limiter
}

func NewRateReader(r io.Reader, limiter *rate.Limiter) *RateReader {
return &RateReader{
underlying: r,
limiter: limiter,
}
}

func (rr *RateReader) Read(p []byte) (n int, err error) {
n, err = rr.underlying.Read(p)
if err != nil {
return
}

err = rr.limiter.WaitN(context.Background(), n)
if err != nil {
return
}
return
}
34 changes: 26 additions & 8 deletions worker/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package worker

import (
"fmt"
"io"
"net"
"sync"
"time"

rateio "github.com/fatedier/fft/pkg/io"
"github.com/fatedier/fft/pkg/log"
"github.com/fatedier/fft/pkg/msg"

gio "github.com/fatedier/golib/io"

"golang.org/x/time/rate"
)

type TransferConn struct {
Expand All @@ -31,12 +36,17 @@ func NewTransferConn(id string, conn net.Conn, isSender bool) *TransferConn {
type MatchController struct {
conns map[string]*TransferConn

mu sync.Mutex
rateLimit *rate.Limiter
mu sync.Mutex
}

func NewMatchController() *MatchController {
func NewMatchController(rateByte int) *MatchController {
if rateByte < 50*1024 {
rateByte = 50 * 1024
}
return &MatchController{
conns: make(map[string]*TransferConn),
conns: make(map[string]*TransferConn),
rateLimit: rate.NewLimiter(rate.Limit(float64(rateByte)), 16*1024),
}
}

Expand All @@ -54,15 +64,23 @@ func (mc *MatchController) DealTransferConn(tc *TransferConn, timeout time.Durat
if !ok {
select {
case pairConn := <-tc.pairConnCh:
var sender, receiver io.ReadWriteCloser
if tc.isSender {
msg.WriteMsg(tc.conn, &msg.NewSendFileStreamResp{})
msg.WriteMsg(pairConn.conn, &msg.NewReceiveFileStreamResp{})
sender = gio.WrapReadWriteCloser(rateio.NewRateReader(tc.conn, mc.rateLimit), tc.conn, func() error {
return tc.conn.Close()
})
receiver = pairConn.conn
} else {
msg.WriteMsg(tc.conn, &msg.NewReceiveFileStreamResp{})
msg.WriteMsg(pairConn.conn, &msg.NewSendFileStreamResp{})
sender = gio.WrapReadWriteCloser(rateio.NewRateReader(pairConn.conn, mc.rateLimit), pairConn.conn, func() error {
return pairConn.conn.Close()
})
receiver = tc.conn
}
msg.WriteMsg(sender, &msg.NewSendFileStreamResp{})
msg.WriteMsg(receiver, &msg.NewReceiveFileStreamResp{})

go func() {
gio.Join(tc.conn, pairConn.conn)
gio.Join(sender, receiver)
log.Info("ID [%s] join pair connections closed", tc.id)
}()
case <-time.After(timeout):
Expand Down
6 changes: 5 additions & 1 deletion worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Options struct {
ServerAddr string
BindAddr string
AdvicePublicIP string
RateKB int // xx KB/s

LogFile string
LogLevel string
Expand All @@ -30,6 +31,9 @@ func (op *Options) Check() error {
if op.LogMaxDays <= 0 {
op.LogMaxDays = 3
}
if op.RateKB < 50 {
return fmt.Errorf("rate should greater than 50KB")
}
return nil
}

Expand Down Expand Up @@ -64,7 +68,7 @@ func NewService(options Options) (*Service, error) {
advicePublicIP: options.AdvicePublicIP,

l: l,
matchCtl: NewMatchController(),
matchCtl: NewMatchController(options.RateKB * 1024),
tlsConfig: generateTLSConfig(),
}, nil
}
Expand Down

0 comments on commit d3483b6

Please sign in to comment.