Skip to content

Commit

Permalink
implement software rate control
Browse files Browse the repository at this point in the history
TODO: this is still somewhat slow
  • Loading branch information
emmericp committed Mar 1, 2016
1 parent d538d00 commit bf7de1e
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ SET(FILES
src/pipe.cpp
src/lock.cpp
src/namespaces.cpp
src/ring.c
src/rate_limiter.cpp
)

SET(DPDK_LIBS
Expand Down
23 changes: 18 additions & 5 deletions examples/rate-control-methods.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,34 @@ local ts = require "timestamping"
local stats = require "stats"
local hist = require "histogram"
local log = require "log"
local limiter = require "ratelimiter"

local PKT_SIZE = 60
local ETH_DST = "11:12:13:14:15:16"

function master(txPort, rate, rc, pattern, threads)
if not txPort or not rate or not rc or (pattern ~= "cbr" and pattern ~= "poisson") then
return print("usage: txPort rate hw|moongen cbr|poisson [threads]")
return print("usage: txPort rate hw|sw|moongen cbr|poisson [threads]")
end
rate = rate or 2
threads = threads or 1
if pattern == "cbr" and threads ~= 1 then
--return log:error("cbr only supports one thread")
return log:error("cbr only supports one thread")
end
local txDev = device.config{ port = txPort, txQueues = threads, disableOffloads = rc ~= "moongen" }
device.waitForLinks()
for i = 1, threads do
dpdk.launchLua("loadSlave", txDev:getTxQueue(i - 1), txDev, rate, rc, pattern, i)
local rateLimiter
if rc == "sw" then
rateLimiter = limiter:new(txDev:getTxQueue(i - 1), pattern == "cbr" and pattern, 1 / rate * 1000)
end
dpdk.launchLua("loadSlave", txDev:getTxQueue(i - 1), txDev, rate, rc, pattern, rateLimiter, i)
end
dpdk.waitForSlaves()
end

function loadSlave(queue, txDev, rate, rc, pattern, threadId)
local mem = memory.createMemPool(function(buf)
function loadSlave(queue, txDev, rate, rc, pattern, rateLimiter, threadId)
local mem = memory.createMemPool(8192 * 2, function(buf)
buf:getEthernetPacket():fill{
ethSrc = txDev,
ethDst = ETH_DST,
Expand All @@ -48,6 +53,14 @@ function loadSlave(queue, txDev, rate, rc, pattern, threadId)
queue:send(bufs)
if threadId == 1 then txCtr:update() end
end
elseif rc == "sw" then
local bufs = mem:bufArray(1024)
txCtr = stats:newDevTxCounter(txDev, "plain")
while dpdk.running() do
bufs:alloc(PKT_SIZE)
rateLimiter:send(bufs)
if threadId == 1 then txCtr:update() end
end
elseif rc == "moongen" then
txCtr = stats:newManualTxCounter(txDev, "plain")
local dist = pattern == "poisson" and poissonDelay or function(x) return x end
Expand Down
44 changes: 44 additions & 0 deletions lua/include/pipe.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,54 @@ ffi.cdef [[
void* peek(struct spsc_ptr_queue* queue);
uint8_t pop(struct spsc_ptr_queue* queue);
size_t count(struct spsc_ptr_queue* queue);

// DPDK SPSC ring
struct rte_ring { };
struct rte_ring* create_ring(uint32_t count, int32_t socket);
int ring_enqueue(struct rte_ring* r, struct rte_mbuf** obj, int n);
int ring_dequeue(struct rte_ring* r, struct rte_mbuf** obj, int n);
]]

local C = ffi.C

mod.packetRing = {}
local packetRing = mod.packetRing
packetRing.__index = packetRing

function mod:newPacketRing(size, socket)
size = size or 8192
socket = socket or -1
return setmetatable({
ring = C.create_ring(size, socket)
}, packetRing)
end

function mod:newPacketRingFromRing(ring)
return setmetatable({
ring = ring
}, packetRing)
end

-- FIXME: this is work-around for some bug with the serialization of nested objects
function mod:sendToPacketRing(ring, bufs)
C.ring_enqueue(ring, bufs.array, bufs.size);
end

function packetRing:send(bufs)
C.ring_enqueue(self.ring, bufs.array, bufs.size);
end

function packetRing:sendN(bufs, n)
C.ring_enqueue(self.ring, bufs.array, n);
end

function packetRing:recv(bufs)
error("NYI")
end

function packetRing:__serialize()
return "require'pipe'; return " .. serpent.addMt(serpent.dumpRaw(self), "require('pipe').packetRing"), true
end

mod.slowPipe = {}
local slowPipe = mod.slowPipe
Expand Down
58 changes: 58 additions & 0 deletions lua/include/ratelimiter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
local ffi = require "ffi"
local pipe = require "pipe"
local dpdk = require "dpdk"
local serpent = require "Serpent"

local C = ffi.C

ffi.cdef[[
void rate_limiter_main_loop(struct rte_ring* ring, uint8_t device, uint16_t queue);
void rate_limiter_cbr_main_loop(struct rte_ring* ring, uint8_t device, uint16_t queue, uint32_t target);
]]

local mod = {}
local rateLimiter = {}
mod.rateLimiter = rateLimiter

rateLimiter.__index = rateLimiter

function rateLimiter:send(bufs)
pipe:sendToPacketRing(self.ring, bufs)
end

function rateLimiter:__serialize()
return "require 'ratelimiter'; return " .. serpent.addMt(serpent.dumpRaw(self), "require('ratelimiter').rateLimiter"), true
end

--- Create a new rate limiter that allows for precise inter-packet gap generation by wrapping a tx queue.
-- By default it uses packet delay information from buf:setDelay().
-- Can only be created from the master task because it spawns a separate thread.
-- @param queue the wrapped tx queue
-- @param mode optional, either "cbr" or "custom". Defaults to custom.
-- @param delay optional, inter-departure time in nanoseconds for mode == "cbr"
function mod:new(queue, mode, delay)
if mode and mode ~= "cbr" and mode ~= "custom" then
log:fatal("Unsupported mode " .. mode)
end
local ring = pipe:newPacketRing()
local obj = setmetatable({
ring = ring.ring,
mode = mode,
delay = delay,
queue = queue
}, rateLimiter)
dpdk.launchLua("__MG_RATE_LIMITER_MAIN", ring.ring, queue.id, queue.qid, mode, delay)
return obj
end


function __MG_RATE_LIMITER_MAIN(ring, devId, qid, mode, delay)
if mode then
C.rate_limiter_cbr_main_loop(ring, devId, qid, delay)
else
C.rate_limiter_main_loop(ring, devId, qid)
end
end

return mod

61 changes: 61 additions & 0 deletions src/rate_limiter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include <rte_config.h>
#include <rte_common.h>
#include <rte_ring.h>
#include <rte_mbuf.h>
#include <rte_ethdev.h>
#include <rte_mempool.h>
#include <rte_ether.h>
#include <rte_cycles.h>
#include "ring.h"

namespace rate_limiter {
constexpr int batch_size = 64;

// FIXME: actually do the right thing
static inline void main_loop(struct rte_ring* ring, uint8_t device, uint16_t queue) {
struct rte_mbuf* bufs[batch_size];
while (1) {
int rc = ring_dequeue(ring, reinterpret_cast<void**>(bufs), batch_size);
if (rc == 0) {
uint32_t sent = 0;
while (sent < batch_size) {
sent += rte_eth_tx_burst(device, queue, bufs + sent, batch_size - sent);
}
}
}
}

static inline void main_loop_cbr(struct rte_ring* ring, uint8_t device, uint16_t queue, uint32_t target) {
uint64_t tsc_hz = rte_get_tsc_hz();
uint64_t id_cycles = (uint64_t) (target / (1000000000.0 / ((double) tsc_hz)));
uint64_t next_send = 0;
struct rte_mbuf* bufs[batch_size];
while (1) {
int rc = ring_dequeue(ring, reinterpret_cast<void**>(bufs), batch_size);
uint64_t cur = rte_get_tsc_cycles();
// nothing sent for 10 ms, restart rate control
if (cur - next_send > tsc_hz / 100) {
next_send = cur;
}
if (rc == 0) {
uint32_t sent = 0;
while (sent < batch_size) {
while ((cur = rte_get_tsc_cycles()) < next_send);
next_send += id_cycles;
sent += rte_eth_tx_burst(device, queue, bufs + sent, 1);
}
}
}
}
}

extern "C" {
void rate_limiter_cbr_main_loop(rte_ring* ring, uint8_t device, uint16_t queue, uint32_t target) {
rate_limiter::main_loop_cbr(ring, device, queue, target);
}

void rate_limiter_main_loop(rte_ring* ring, uint8_t device, uint16_t queue) {
rate_limiter::main_loop(ring, device, queue);
}
}

22 changes: 22 additions & 0 deletions src/ring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#include <rte_config.h>
#include <rte_common.h>
#include <rte_ring.h>
#include "ring.h"

// DPDK SPSC bounded ring buffer

struct rte_ring* create_ring(uint32_t count, int32_t socket) {
static volatile uint32_t ring_cnt = 0;
char ring_name[32];
sprintf(ring_name, "mbuf_ring%d", __sync_fetch_and_add(&ring_cnt, 1));
return rte_ring_create(ring_name, count, socket, RING_F_SP_ENQ | RING_F_SC_DEQ);
}

int ring_enqueue(struct rte_ring* r, void* const* obj, int n) {
return rte_ring_sp_enqueue_bulk(r, obj, n);
}

int ring_dequeue(struct rte_ring* r, void** obj, int n) {
return rte_ring_sc_dequeue_bulk(r, obj, n);
}

20 changes: 20 additions & 0 deletions src/ring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#ifndef MG_RING_H
#define MG_RING_H

#include <rte_config.h>
#include <rte_common.h>
#include <rte_ring.h>

#ifdef __cplusplus
extern "C" {
#endif

struct rte_ring* create_ring(uint32_t count, int32_t socket);
int ring_enqueue(struct rte_ring* r, void* const* obj, int n);
int ring_dequeue(struct rte_ring* r, void** obj, int n);

#ifdef __cplusplus
}
#endif

#endif

0 comments on commit bf7de1e

Please sign in to comment.