Skip to content

Commit

Permalink
win: implement uv_try_write() for pipes(libuv#3825 1/2)
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoordhuis authored and santigimeno committed Dec 1, 2022
1 parent 1a91508 commit 244e0e2
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 1 deletion.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-pipe-close-stdout-read-stdin.c \
test/test-pipe-set-non-blocking.c \
test/test-pipe-set-fchmod.c \
test/test-pipe-try-write.c \
test/test-platform-output.c \
test/test-poll.c \
test/test-poll-close.c \
Expand Down
2 changes: 2 additions & 0 deletions src/win/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ int uv__pipe_write(uv_loop_t* loop,
uv_stream_t* send_handle,
uv_write_cb cb);
void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t* req);
int uv__pipe_try_write(uv_pipe_t* handle, const uv_buf_t bufs[],
unsigned int nbufs);

void uv__process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_req_t* req);
Expand Down
72 changes: 72 additions & 0 deletions src/win/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,78 @@ int uv__pipe_write(uv_loop_t* loop,
}


int uv__pipe_try_write(uv_pipe_t* handle,
const uv_buf_t bufs[],
unsigned int nbufs) {
OVERLAPPED overlapped;
const uv_buf_t* buf;
int bytes_written;
unsigned int idx;
DWORD timeout;
DWORD err;

if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
return UV_EAGAIN;
}

if (handle->stream.conn.write_reqs_pending > 0) {
return UV_EAGAIN;
}

timeout = 0;
if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
timeout = INFINITE;
}

memset(&overlapped, 0, sizeof(overlapped));

overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (overlapped.hEvent == NULL) {
uv_fatal_error(GetLastError(), "CreateEvent");
}

bytes_written = 0;
for (err = 0, idx = 0; idx < nbufs; err = 0, idx += 1) {
buf = &bufs[idx];

if (WriteFile(handle->handle, buf->base, buf->len, NULL, &overlapped)) {
bytes_written += buf->len;
continue;
}

err = GetLastError();
if (err != ERROR_IO_PENDING) {
break;
}

err = WaitForSingleObject(overlapped.hEvent, timeout);
if (err == WAIT_OBJECT_0) {
bytes_written += buf->len;
continue;
}

if (err == WAIT_TIMEOUT &&
CancelIo(handle->handle) &&
GetOverlappedResult(handle->handle, &overlapped, &err, TRUE)) {
bytes_written += err;
err = WSAEWOULDBLOCK; /* Translates to UV_EAGAIN. */
} else {
err = GetLastError();
}

break;
}

CloseHandle(overlapped.hEvent);

if (bytes_written == 0 && err != 0) {
return uv_translate_sys_error(err);
}

return bytes_written;
}


static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
uv_buf_t buf) {
/* If there is an eof timer running, we don't need it any more, so discard
Expand Down
2 changes: 1 addition & 1 deletion src/win/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ int uv_try_write(uv_stream_t* stream,
case UV_TTY:
return uv__tty_try_write((uv_tty_t*) stream, bufs, nbufs);
case UV_NAMED_PIPE:
return UV_EAGAIN;
return uv__pipe_try_write((uv_pipe_t*) stream, bufs, nbufs);
default:
assert(0);
return UV_ENOSYS;
Expand Down
6 changes: 6 additions & 0 deletions test/test-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ TEST_DECLARE (pipe_getsockname_blocking)
TEST_DECLARE (pipe_pending_instances)
TEST_DECLARE (pipe_sendmsg)
TEST_DECLARE (pipe_server_close)
TEST_DECLARE (pipe_try_write_0)
TEST_DECLARE (pipe_try_write_1)
TEST_DECLARE (pipe_try_write_2)
TEST_DECLARE (connection_fail)
TEST_DECLARE (connection_fail_doesnt_auto_close)
TEST_DECLARE (shutdown_close_tcp)
Expand Down Expand Up @@ -587,6 +590,9 @@ TASK_LIST_START
TEST_ENTRY (pipe_connect_on_prepare)

TEST_ENTRY (pipe_server_close)
TEST_ENTRY (pipe_try_write_0)
TEST_ENTRY (pipe_try_write_1)
TEST_ENTRY (pipe_try_write_2)
#ifndef _WIN32
TEST_ENTRY (pipe_close_stdout_read_stdin)
#endif
Expand Down
125 changes: 125 additions & 0 deletions test/test-pipe-try-write.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#include "uv.h"
#include "task.h"

static void (*spam)(uv_pipe_t* handle);
static uv_pipe_t client_handle;
static uv_pipe_t peer_handle;
static uv_pipe_t server_handle;
static uv_write_t write_req;


static void write_cb(uv_write_t* req, int status) {
ASSERT(0 == status);
}


static void spam_0(uv_pipe_t* handle) {
uv_buf_t buf;

buf = uv_buf_init("", 0);
ASSERT(0 == uv_try_write((uv_stream_t*) handle, &buf, 1));

/* Non-empty write to start the event loop moving. */
buf = uv_buf_init("hello, world", sizeof("hello, world") - 1);
ASSERT(0 == uv_write(&write_req, (uv_stream_t*) handle, &buf, 1, write_cb));
}


static void spam_1(uv_pipe_t* handle) {
uv_buf_t buf;
int rc;

buf = uv_buf_init("hello, world", sizeof("hello, world") - 1);
do
rc = uv_try_write((uv_stream_t*) handle, &buf, 1);
while (rc > 0);

ASSERT(rc == UV_EAGAIN);
}


static void spam_2(uv_pipe_t* handle) {
uv_buf_t bufs[2];
int rc;

bufs[0] = uv_buf_init("hello,", sizeof("hello,") - 1);
bufs[1] = uv_buf_init(" world", sizeof(" world") - 1);

do
rc = uv_try_write((uv_stream_t*) handle, bufs, ARRAY_SIZE(bufs));
while (rc > 0);

ASSERT(rc == UV_EAGAIN);
}


static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
static char base[256];

buf->base = base;
buf->len = sizeof(base);
}


static void read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
if (spam == spam_0) {
ASSERT(nread > 0); /* Expect some bytes. */
} else {
ASSERT(nread == (ssize_t) buf->len); /* Expect saturation. */
}

if (handle == (uv_stream_t*) &peer_handle) {
spam(&client_handle);
} else {
uv_close((uv_handle_t*) &peer_handle, NULL);
uv_close((uv_handle_t*) &client_handle, NULL);
uv_close((uv_handle_t*) &server_handle, NULL);
}
}


static void connection_cb(uv_stream_t* handle, int status) {
ASSERT(0 == status);
ASSERT(0 == uv_pipe_init(uv_default_loop(), &peer_handle, 0));
ASSERT(0 == uv_accept((uv_stream_t*) &server_handle,
(uv_stream_t*) &peer_handle));
ASSERT(0 == uv_read_start((uv_stream_t*) &peer_handle, alloc_cb, read_cb));
spam(&peer_handle);
}


static void connect_cb(uv_connect_t* req, int status) {
ASSERT(0 == status);
ASSERT(0 == uv_read_start((uv_stream_t*) &client_handle, alloc_cb, read_cb));
}


static int pipe_try_write(void (*spammer)(uv_pipe_t*)) {
uv_connect_t connect_req;

spam = spammer;
ASSERT(0 == uv_pipe_init(uv_default_loop(), &client_handle, 0));
ASSERT(0 == uv_pipe_init(uv_default_loop(), &server_handle, 0));
ASSERT(0 == uv_pipe_bind(&server_handle, TEST_PIPENAME));
ASSERT(0 == uv_listen((uv_stream_t*) &server_handle, 1, connection_cb));
uv_pipe_connect(&connect_req, &client_handle, TEST_PIPENAME, connect_cb);
ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));

MAKE_VALGRIND_HAPPY();
return 0;
}


TEST_IMPL(pipe_try_write_0) {
return pipe_try_write(spam_0);
}


TEST_IMPL(pipe_try_write_1) {
return pipe_try_write(spam_1);
}


TEST_IMPL(pipe_try_write_2) {
return pipe_try_write(spam_2);
}

0 comments on commit 244e0e2

Please sign in to comment.