Skip to content

Commit

Permalink
Merge branch 'master' of github.com:JuliaLang/julia
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffBezanson committed Dec 8, 2015
2 parents 45a6383 + 61903fb commit 0ea14f8
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 0 deletions.
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,7 @@ export
@sync,
@async,
@task,
@threadcall,

# multiprocessing
@spawn,
Expand Down
2 changes: 2 additions & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ include("docs/basedocs.jl")

# threads
include("threads.jl")
include("threadcall.jl")

function __init__()
# Base library init
Expand All @@ -306,6 +307,7 @@ function __init__()
early_init()
init_load_path()
init_parallel()
init_threadcall()
end

include = include_from_node1
Expand Down
94 changes: 94 additions & 0 deletions base/threadcall.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
const max_ccall_threads = parse(Int, get(ENV, "UV_THREADPOOL_SIZE", "4"))
const thread_notifiers = [Nullable{Condition}() for i in 1:max_ccall_threads]
const threadcall_restrictor = Semaphore(max_ccall_threads)

function notify_fun(idx)
global thread_notifiers
notify(get(thread_notifiers[idx]))
return
end

function init_threadcall()
global c_notify_fun = cfunction(notify_fun, Void, (Cint,))
end

"""
@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)
The `@threadcall` macro is called in the same way as `ccall` but does the work
in a different thread. This is useful when you want to call a blocking C
function without causing the main `julia` thread to become blocked. Concurrency
is limited by size of the libuv thread pool, which defaults to 4 threads but
can be increased by setting the `UV_THREADPOOL_SIZE` environment variable and
restarting the `julia` process.
"""
macro threadcall(f, rettype, argtypes, argvals...)
# check for usage errors
isa(argtypes,Expr) && argtypes.head == :tuple ||
error("threadcall: argument types must be a tuple")
length(argtypes.args) == length(argvals) ||
error("threadcall: wrong number of arguments to C function")

# hygiene escape arguments
f = esc(f)
rettype = esc(rettype)
argtypes = map(esc, argtypes.args)
argvals = map(esc, argvals)

# construct non-allocating wrapper to call C function
wrapper = :(function wrapper(args_ptr::Ptr{Void}, retval_ptr::Ptr{Void})
p = args_ptr
end)
body = wrapper.args[2].args
args = Symbol[]
for (i,T) in enumerate(argtypes)
arg = symbol("arg$i")
push!(body, :($arg = unsafe_load(convert(Ptr{$T}, p))))
push!(body, :(p += sizeof($T)))
push!(args, arg)
end
push!(body, :(ret = ccall($f, $rettype, ($(argtypes...),), $(args...))))
push!(body, :(unsafe_store!(convert(Ptr{$rettype}, retval_ptr), ret)))
push!(body, :(return sizeof($rettype)))

# return code to generate wrapper function and send work request thread queue
:(let
$wrapper
do_threadcall(wrapper, $rettype, Any[$(argtypes...)], Any[$(argvals...)])
end)
end

function do_threadcall(wrapper::Function, rettype::Type, argtypes::Vector, argvals::Vector)
# generate function pointer
fun_ptr = cfunction(wrapper, Int, (Ptr{Void}, Ptr{Void}))

# cconvert, root and unsafe_convert arguments
roots = Any[]
args_arr = Array{UInt8}(sum(sizeof, argtypes))
ptr = pointer(args_arr)
for (T, x) in zip(argtypes, argvals)
y = cconvert(T, x)
push!(roots, y)
unsafe_store!(convert(Ptr{T}, ptr), unsafe_convert(T, y))
end

# create return buffer
ret_arr = Array(UInt8, sizeof(rettype))

# wait for a worker thread to be available
acquire(threadcall_restrictor)
idx = findfirst(isnull, thread_notifiers)
thread_notifiers[idx] = Nullable{Condition}(Condition())

# queue up the work to be done
ccall(:jl_queue_work, Void,
(Ptr{Void}, Ptr{UInt8}, Ptr{UInt8}, Ptr{Void}, Cint),
fun_ptr, args_arr, ret_arr, c_notify_fun, idx)

# wait for a result & return it
wait(get(thread_notifiers[idx]))
thread_notifiers[idx] = Nullable{Condition}()
release(threadcall_restrictor)

unsafe_load(convert(Ptr{rettype}, pointer(ret_arr)))
end
45 changes: 45 additions & 0 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,51 @@ JL_DLLEXPORT int jl_tty_set_mode(uv_tty_t *handle, int mode)
return uv_tty_set_mode(handle, mode);
}

typedef int (*work_cb_t)(void *, void *);
typedef void (*notify_cb_t)(int);

struct work_baton {
uv_work_t req;
work_cb_t work_func;
void *work_args;
void *work_retval;
notify_cb_t notify_func;
pid_t tid;
int notify_idx;
};

#ifdef _OS_LINUX_
#include <sys/syscall.h>
#endif

void jl_work_wrapper(uv_work_t *req) {
struct work_baton *baton = (struct work_baton*) req->data;
baton->work_func(baton->work_args, baton->work_retval);
}

void jl_work_notifier(uv_work_t *req, int status) {
struct work_baton *baton = (struct work_baton*) req->data;
baton->notify_func(baton->notify_idx);
free(baton);
}

JL_DLLEXPORT int jl_queue_work(
void *work_func, void *work_args, void *work_retval,
void *notify_func, int notify_idx)
{
struct work_baton *baton = (struct work_baton*) malloc(sizeof(struct work_baton));
baton->req.data = (void*) baton;
baton->work_func = work_func;
baton->work_args = work_args;
baton->work_retval = work_retval;
baton->notify_func = notify_func;
baton->notify_idx = notify_idx;

uv_queue_work(jl_io_loop, &baton->req, jl_work_wrapper, jl_work_notifier);

return 0;
}

#ifndef _OS_WINDOWS_
#if defined(__APPLE__)
int uv___stream_fd(uv_stream_t *handle);
Expand Down
12 changes: 12 additions & 0 deletions test/ccall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,15 @@ ccall(foo13031p, Cint, (Ref{Tuple{}},), ())
foo13031(x,y,z) = z
foo13031p = cfunction(foo13031, Cint, (Ref{Tuple{}},Ref{Tuple{}},Cint))
ccall(foo13031p, Cint, (Ref{Tuple{}},Ref{Tuple{}},Cint), (), (), 8)

# @threadcall functionality
threadcall_test_func(x) =
@threadcall((:testUcharX, libccalltest), Int32, (UInt8,), x % UInt8)

@test threadcall_test_func(3) == 1
@test threadcall_test_func(259) == 1

@test 1.5 > @elapsed @sync for i = 1:4
@unix_only @async @threadcall(:sleep, Cuint, (Cuint,), 1)
@windows_only @async @threadcall(:Sleep, Void, (UInt32,), 1000)
end

0 comments on commit 0ea14f8

Please sign in to comment.