Skip to content

Commit

Permalink
Improve throttling and add Title support (microsoft#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
ipankajg authored Dec 5, 2020
1 parent 46d0e62 commit aa63660
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 39 deletions.
37 changes: 21 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ func runTCPBandwidthTestHandler(test *ethrTest, conn net.Conn, wg *sync.WaitGrou
for i := uint32(0); i < size; i++ {
buff[i] = byte(i)
}
start, waitTime, sendRate := beginThrottle()
bufferLen := len(buff)
totalBytesToSend := test.clientParam.BwRate
sentBytes := uint64(0)
start, waitTime, bytesToSend := beginThrottle(totalBytesToSend, bufferLen)
ExitForLoop:
for {
select {
Expand All @@ -273,19 +276,19 @@ ExitForLoop:
n := 0
var err error = nil
if test.clientParam.Reverse {
n, err = io.ReadFull(conn, buff)
n, err = conn.Read(buff)
} else {
n, err = conn.Write(buff)
n, err = conn.Write(buff[:bytesToSend])
}
if err != nil || n < int(size) {
if err != nil {
ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err)
break ExitForLoop
}
atomic.AddUint64(&ec.bw, uint64(size))
atomic.AddUint64(&test.testResult.bw, uint64(size))
sendRate += uint64(size)
if test.clientParam.BwRate > 0 && !test.clientParam.Reverse && sendRate >= test.clientParam.BwRate {
start, waitTime, sendRate = enforceThrottle(start, waitTime)
atomic.AddUint64(&ec.bw, uint64(n))
atomic.AddUint64(&test.testResult.bw, uint64(n))
if !test.clientParam.Reverse {
sentBytes += uint64(n)
start, waitTime, sentBytes, bytesToSend = enforceThrottle(start, waitTime, totalBytesToSend, sentBytes, bufferLen)
}
}
}
Expand Down Expand Up @@ -968,30 +971,32 @@ func runUDPBandwidthAndPpsTest(test *ethrTest) {
lserver, lport, _ := net.SplitHostPort(conn.LocalAddr().String())
ui.printMsg("[%3d] local %s port %s connected to %s port %s",
ec.fd, lserver, lport, rserver, rport)
blen := len(buff)
start, waitTime, sendRate := beginThrottle()
bufferLen := len(buff)
totalBytesToSend := test.clientParam.BwRate
sentBytes := uint64(0)
start, waitTime, bytesToSend := beginThrottle(totalBytesToSend, bufferLen)
ExitForLoop:
for {
select {
case <-test.done:
break ExitForLoop
default:
n, err := conn.Write(buff)
n, err := conn.Write(buff[:bytesToSend])
if err != nil {
ui.printDbg("%v", err)
continue
}
if n < blen {
if n < bytesToSend {
ui.printDbg("Partial write: %d", n)
continue
}
atomic.AddUint64(&ec.bw, uint64(n))
atomic.AddUint64(&ec.pps, 1)
atomic.AddUint64(&test.testResult.bw, uint64(n))
atomic.AddUint64(&test.testResult.pps, 1)
sendRate += uint64(size)
if test.clientParam.BwRate > 0 && !test.clientParam.Reverse && sendRate >= test.clientParam.BwRate {
start, waitTime, sendRate = enforceThrottle(start, waitTime)
if !test.clientParam.Reverse {
sentBytes += uint64(n)
start, waitTime, sentBytes, bytesToSend = enforceThrottle(start, waitTime, totalBytesToSend, sentBytes, bufferLen)
}
}
}
Expand Down
7 changes: 0 additions & 7 deletions ethr.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,6 @@ func main() {
bwRate /= 8
}

// Adjust the numbers so that data can be transfered in equal units.
if bwRate > 0 {
factor := (bwRate + bufLen - 1) / bufLen
bufLen = bwRate / factor
bwRate = bufLen * factor
}

//
// For Pkt/s, we always override the buffer size to be just 1 byte.
// TODO: Evaluate in future, if we need to support > 1 byte packets for
Expand Down
23 changes: 17 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,26 @@ func srvrRunTCPBandwidthTest(test *ethrTest, clientParam EthrClientParam, conn n
for i := uint32(0); i < size; i++ {
buff[i] = byte(i)
}
start, waitTime, sendRate := beginThrottle()
bufferLen := len(buff)
totalBytesToSend := test.clientParam.BwRate
sentBytes := uint64(0)
start, waitTime, bytesToSend := beginThrottle(totalBytesToSend, bufferLen)
for {
n := 0
var err error
if clientParam.Reverse {
_, err = conn.Write(buff)
n, err = conn.Write(buff[:bytesToSend])
} else {
_, err = io.ReadFull(conn, buff)
n, err = conn.Read(buff)
}
if err != nil {
ui.printDbg("Error sending/receiving data on a connection for bandwidth test: %v", err)
break
}
sendRate += uint64(size)
atomic.AddUint64(&test.testResult.bw, uint64(size))
if clientParam.BwRate > 0 && clientParam.Reverse && sendRate >= clientParam.BwRate {
start, waitTime, sendRate = enforceThrottle(start, waitTime)
if clientParam.Reverse {
sentBytes += uint64(n)
start, waitTime, sentBytes, bytesToSend = enforceThrottle(start, waitTime, totalBytesToSend, sentBytes, bufferLen)
}
}
}
Expand Down Expand Up @@ -239,6 +243,12 @@ func srvrRunUDPServer() error {
ui.printDbg("Error listening on %s for UDP pkt/s tests: %v", gEthrPortStr, err)
return err
}
// Set socket buffer to 4MB per CPU so we can queue 4MB per CPU in case Ethr is not
// able to keep up temporarily.
err = l.SetReadBuffer(runtime.NumCPU() * 4 * 1024 * 1024)
if err != nil {
ui.printDbg("Failed to set ReadBuffer on UDP socket: %v", err)
}
//
// We use NumCPU here instead of NumThreads passed from client. The
// reason is that for UDP, there is no connection, so all packets come
Expand Down Expand Up @@ -289,6 +299,7 @@ func srvrRunUDPPacketHandler(conn *net.UDPConn) {
ui.printDbg("Error receiving data from UDP for bandwidth test: %v", err)
continue
}
ethrUnused(remoteIP)
ethrUnused(n)
server, port, _ := net.SplitHostPort(remoteIP.String())
test, found := tests[server]
Expand Down
46 changes: 36 additions & 10 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ func ethrDialEx(p EthrProtocol, dialAddr, localIP string, localPortNum uint16, t
if ok {
tcpconn.SetLinger(0)
}
udpconn, ok := conn.(*net.UDPConn)
if ok {
err = udpconn.SetWriteBuffer(4 * 1024 * 1024)
if err != nil {
ui.printDbg("Failed to set ReadBuffer on UDP socket: %v", err)
}
}
}
return
}
Expand Down Expand Up @@ -432,20 +439,39 @@ func ethrLookupIP(server string) (net.IPAddr, string, error) {
// This is a workaround to ensure we generate traffic at certain rate
// and stats are printed correctly. We ensure that current interval lasts
// 100ms after stats are printed, not perfect but workable.
func beginThrottle() (start time.Time, waitTime time.Duration, sendRate uint64) {
func beginThrottle(totalBytesToSend uint64, bufferLen int) (start time.Time, waitTime time.Duration, bytesToSend int) {
start = time.Now()
waitTime = time.Until(lastStatsTime.Add(time.Second + 100*time.Millisecond))
sendRate = uint64(0)
waitTime = time.Until(lastStatsTime.Add(time.Second + 50*time.Millisecond))
bytesToSend = bufferLen
if totalBytesToSend > 0 && totalBytesToSend < uint64(bufferLen) {
bytesToSend = int(totalBytesToSend)
}
return
}

func enforceThrottle(s time.Time, wt time.Duration) (start time.Time, waitTime time.Duration, sendRate uint64) {
timeTaken := time.Since(s)
if timeTaken < wt {
time.Sleep(wt - timeTaken)
func enforceThrottle(s time.Time, wt time.Duration, totalBytesToSend, oldSentBytes uint64, bufferLen int) (start time.Time, waitTime time.Duration, newSentBytes uint64, bytesToSend int) {
start = s
waitTime = wt
newSentBytes = oldSentBytes
bytesToSend = bufferLen
if totalBytesToSend > 0 {
remainingBytes := totalBytesToSend - oldSentBytes
if remainingBytes > 0 {
if remainingBytes < uint64(bufferLen) {
bytesToSend = int(remainingBytes)
}
} else {
timeTaken := time.Since(s)
if timeTaken < wt {
time.Sleep(wt - timeTaken)
}
start = time.Now()
waitTime = time.Until(lastStatsTime.Add(time.Second + 50*time.Millisecond))
newSentBytes = 0
if totalBytesToSend < uint64(bufferLen) {
bytesToSend = int(totalBytesToSend)
}
}
}
start = time.Now()
waitTime = time.Until(lastStatsTime.Add(time.Second + 100*time.Millisecond))
sendRate = 0
return
}

0 comments on commit aa63660

Please sign in to comment.