Skip to content

Commit

Permalink
Replace poll with epoll/kqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
karlseguin committed Nov 30, 2023
1 parent dc44e87 commit 48a98a4
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 131 deletions.
47 changes: 47 additions & 0 deletions src/conn.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
const std = @import("std");

const Self = @This();

pub const Conn = struct {
stream: std.net.Stream,
last_request: i64,
address: std.net.Address,
next: ?*Conn = null,
prev: ?*Conn = null,

pub const List = Self.List;
};

const List = struct {
len: usize = 0,
head: ?*Conn = null,
tail: ?*Conn = null,

pub fn insert(self: *List, node: *Conn) void {
if (self.tail) |tail| {
tail.next = node;
node.prev = tail;
self.tail = node;
} else {
self.head = node;
self.tail = node;
}
self.len += 1;
node.next = null;
}

pub fn remove(self: *List, node: *Conn) void {
if (node.prev) |prev| {
prev.next = node.next;
} else {
self.head = node.next;
}

if (node.next) |next| {
next.prev = node.prev;
} else {
self.tail = node.prev;
}
self.len -= 1;
}
};
65 changes: 26 additions & 39 deletions src/httpz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub const Url = @import("url.zig").Url;
pub const Config = @import("config.zig").Config;

const Allocator = std.mem.Allocator;
const Conn = @import("conn.zig").Conn;

pub const Protocol = enum {
HTTP10,
Expand Down Expand Up @@ -199,9 +200,6 @@ pub fn ServerCtx(comptime G: type, comptime R: type) type {
_errorHandler: ErrorHandlerAction(G),
_notFoundHandler: Action(G),
_cond: Thread.Condition,
_mutex: Thread.Mutex,
_threads: []Thread,
_workers: []Worker,

const Self = @This();
const Worker = @import("worker.zig").Worker(*Self);
Expand All @@ -219,22 +217,11 @@ pub fn ServerCtx(comptime G: type, comptime R: type) type {
var_config.address = "127.0.0.1";
}

const worker_count = config.pool.count orelse 2;

const threads = try allocator.alloc(Thread, worker_count);
errdefer allocator.free(threads);

const workers = try allocator.alloc(Worker, worker_count);
errdefer allocator.free(workers);

return .{
.ctx = ctx,
.config = var_config,
.allocator = allocator,
._cond = .{},
._mutex = .{},
._threads = threads,
._workers = workers,
._errorHandler = erh,
._notFoundHandler = nfh,
._router = try Router(G, R).init(allocator, dd, ctx),
Expand All @@ -243,20 +230,7 @@ pub fn ServerCtx(comptime G: type, comptime R: type) type {
}

pub fn deinit(self: *Self) void {
const allocator = self.allocator;
for (self._threads) |thrd| {
thrd.detach();
}
allocator.free(self._threads);

// this isn't right, but we'll deal with proper shutdown in the future
// since we have other things blocking this from working correctly.
for (self._workers) |*wrk| {
wrk.deinit();
}
allocator.free(self._workers);

self._router.deinit(allocator);
self._router.deinit(self.allocator);
}

pub fn listen(self: *Self) !void {
Expand All @@ -270,7 +244,7 @@ pub fn ServerCtx(comptime G: type, comptime R: type) type {
.kernel_backlog = 1024,
.force_nonblocking = true,
});
errdefer socket.deinit();
defer socket.deinit();

var no_delay = true;
const address = blk: {
Expand All @@ -295,35 +269,48 @@ pub fn ServerCtx(comptime G: type, comptime R: type) type {
try os.setsockopt(socket.sockfd.?, os.IPPROTO.TCP, 1, &std.mem.toBytes(@as(c_int, 1)));
}

const allocator = self.allocator;
const signal = try os.pipe();

const worker_count = config.pool.count orelse 2;
const workers = try allocator.alloc(Worker, worker_count);
const threads = try allocator.alloc(Thread, worker_count);

var started: usize = 0;
var workers = self._workers;
var threads = self._threads;

errdefer {
defer {
socket.close();
os.close(signal[1]);
for (0..started) |i| {
threads[i].join();
workers[i].deinit();
threads[i].detach();
}
allocator.free(workers);
allocator.free(threads);
}

const allocator = self.allocator;

for (0..workers.len) |i| {
workers[i] = try Worker.init(allocator, allocator, self, &config);
threads[i] = try Thread.spawn(.{}, Worker.run, .{&workers[i], &socket});
threads[i] = try Thread.spawn(.{}, Worker.run, .{&workers[i], &socket, signal[0]});
started += 1;
}

// TODO: figure out how to unblock this
// poll won't trigger if we close the listening socket
self._mutex.lock();
self._cond.wait(&self._mutex);
// is this really the best way?
var mutex = Thread.Mutex{};
mutex.lock();
self._cond.wait(&mutex);
mutex.unlock();
}

pub fn listenInNewThread(self: *Self) !std.Thread {
return try std.Thread.spawn(.{}, listen, .{self});
}

pub fn stop(self: *Self) void {
self._cond.signal();
}

pub fn notFound(self: *Self, nfa: Action(G)) void {
self._notFoundHandler = nfa;
}
Expand Down
4 changes: 2 additions & 2 deletions src/request.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1243,12 +1243,12 @@ fn expectParseError(expected: anyerror, input: []const u8, config: Config) !void
s.write(input);

const req_state = Request.State.init(t.arena, config) catch unreachable;
try t.expectError(expected, Request.parse(t.allocator, &req_state, s.conn));
try t.expectError(expected, Request.parse(t.allocator, &req_state, s.conn.stream, s.conn.address));
}

fn testRequest(config: Config, stream: t.Stream) !Request {
const req_state = Request.State.init(t.arena, config) catch unreachable;
return Request.parse(t.arena, &req_state, stream.conn);
return Request.parse(t.arena, &req_state, stream, stream.address);
}

fn testCleanup(r: Request) void {
Expand Down
Loading

0 comments on commit 48a98a4

Please sign in to comment.