Skip to content

Commit

Permalink
Add job
Browse files Browse the repository at this point in the history
  • Loading branch information
g41797 committed Dec 29, 2024
1 parent 329d48d commit f62eaa4
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 6 deletions.
51 changes: 45 additions & 6 deletions src/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const err = @import("err.zig");
const ReturnedError = err.ReturnedError;
const parse = @import("parse.zig");
const tubename = @import("name.zig");
const Job = @import("job.zig").Job;

pub const DefaultAddr = "127.0.0.1";
pub const DafaultPort = 11300;
Expand Down Expand Up @@ -289,11 +290,40 @@ pub const Client = struct {
return tubes;
}

/// Returns job for processing from the watched tubes.
/// Receives job for processing from the watched tubes.
/// Client will block no more then 'timeout' seconds if job for processing does not exist.
pub fn reserve(cl: *Client, timeout: u32) !struct { id: u32, job: []const u8 } {
_ = cl;
_ = timeout;
pub fn reserve(cl: *Client, timeout: u32, job: *Job) !void {
cl.mutex.lock();
defer cl.mutex.unlock();

// reserve-with-timeout <seconds>\r\n
try cl.print_line("reserve-with-timeout {0d}", .{
timeout,
});
try cl.flush();

const linelen = try cl.read_line(cl.readLine[0..]);
if (!std.mem.startsWith(u8, cl.readLine[0..linelen], "RESERVED")) {
return err.findError(cl.readLine[0..linelen]);
}

const ret = try parse.parseSize(cl.readLine[0..linelen]);

const jsize: usize = ret[1];

try job.alloc(jsize);

try cl.read_buffer(job.buffer[0..job.len], jsize);

job.actual_len = jsize;

ret = try parse.parseSize(ret[0]);

const jid: usize = ret[1];

job.jid = jid;

return;
}

/// Removes a job from the server entirely.
Expand Down Expand Up @@ -443,8 +473,17 @@ pub const Client = struct {
if (cl.connection == null) {
return ReturnedError.CommunicationFailure;
}
_ = len;
_ = buffer;
if (len > 0) {
const rlen = try cl.connection.?.reader().readAtLeast(buffer, len);

if (rlen < len) {
return ReturnedError.NoCRLF;
}
}
var rn: [2]u8 = undefined;
_ = try cl.readLine(&rn[0..2], 2);

return;
}

fn connectTcp(client: *Client, host: []const u8, port: u16) !*Connection {
Expand Down
65 changes: 65 additions & 0 deletions src/job.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2024 g41797
// SPDX-License-Identifier: MIT

const std = @import("std");
const Allocator = std.mem.Allocator;

pub const Job = struct {
jid: ?u32 = null,
buffer: ?[]u8 = null,
len: usize = undefined,
actual_len: usize = 0,
allocator: Allocator = undefined,

pub fn alloc(job: *Job, len: usize) !void {
if (job.buffer == null) {
job.len = roundlen(len);
job.buffer = try job.allocator.alloc(u8, job.len);
return;
}

len = roundlen(len);

if (job.len >= len) {
return;
}

job.free();

job.len = len;

return job.alloc();
}

pub fn free(job: *Job) void {
if (job.buffer != null) {
job.allocator.free(job.buffer.?);
job.buffer = null;
}
return;
}

pub fn id(job: *Job) ?u32 {
if (job.buffer == null) {
return null;
}
if (job.id == null) {
return null;
}
return job.id.?;
}

pub fn body(job: *Job) ?[]u8 {
if (job.buffer == null) {
return null;
}
return job.buffer[0..job.actual_len];
}
};

inline fn roundlen(len: usize) usize {
if (len == 0) {
return 256;
}
return ((len / 256) + 1) * 256;
}
1 change: 1 addition & 0 deletions src/root.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ const name = @import("name.zig");
const parse = @import("parse.zig");
const err = @import("err.zig");
const client = @import("client.zig");
const job = @import("job.zig");

0 comments on commit f62eaa4

Please sign in to comment.