Skip to content

Commit

Permalink
lib,src: improve writev() performance for Buffers
Browse files Browse the repository at this point in the history
PR-URL: nodejs#13187
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
  • Loading branch information
mscdex committed May 26, 2017
1 parent 112ef23 commit 01a1022
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 70 deletions.
13 changes: 12 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!isBuf) {
var newChunk = decodeChunk(state, chunk, encoding);
if (chunk !== newChunk) {
isBuf = true;
encoding = 'buffer';
chunk = newChunk;
}
Expand All @@ -335,7 +336,13 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {

if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = { chunk, encoding, callback: cb, next: null };
state.lastBufferedRequest = {
chunk,
encoding,
isBuf,
callback: cb,
next: null
};
if (last) {
last.next = state.lastBufferedRequest;
} else {
Expand Down Expand Up @@ -438,11 +445,15 @@ function clearBuffer(stream, state) {
holder.entry = entry;

var count = 0;
var allBuffers = true;
while (entry) {
buffer[count] = entry;
if (!entry.isBuf)
allBuffers = false;
entry = entry.next;
count += 1;
}
buffer.allBuffers = allBuffers;

doWrite(stream, state, true, state.length, buffer, '', holder.finish);

Expand Down
21 changes: 15 additions & 6 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -726,13 +726,22 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
var err;

if (writev) {
var chunks = new Array(data.length << 1);
for (var i = 0; i < data.length; i++) {
var entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
var allBuffers = data.allBuffers;
var chunks;
var i;
if (allBuffers) {
chunks = data;
for (i = 0; i < data.length; i++)
data[i] = data[i].chunk;
} else {
chunks = new Array(data.length << 1);
for (i = 0; i < data.length; i++) {
var entry = data[i];
chunks[i * 2] = entry.chunk;
chunks[i * 2 + 1] = entry.encoding;
}
}
err = this._handle.writev(req, chunks);
err = this._handle.writev(req, chunks, allBuffers);

// Retain chunks
if (err === 0) req._chunks = chunks;
Expand Down
150 changes: 87 additions & 63 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,92 +100,116 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {

Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Array> chunks = args[1].As<Array>();
bool all_buffers = args[2]->IsTrue();

size_t count = chunks->Length() >> 1;
size_t count;
if (all_buffers)
count = chunks->Length();
else
count = chunks->Length() >> 1;

MaybeStackBuffer<uv_buf_t, 16> bufs(count);
uv_buf_t* buf_list = *bufs;

// Determine storage size first
size_t storage_size = 0;
for (size_t i = 0; i < count; i++) {
storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);

Local<Value> chunk = chunks->Get(i * 2);

if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required

// String chunk
Local<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535)
chunk_size = StringBytes::Size(env->isolate(), string, encoding);
else
chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);

storage_size += chunk_size;
}
uint32_t bytes = 0;
size_t offset;
AsyncWrap* wrap;
WriteWrap* req_wrap;
int err;

if (storage_size > INT_MAX)
return UV_ENOBUFS;
if (!all_buffers) {
// Determine storage size first
for (size_t i = 0; i < count; i++) {
storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);

AsyncWrap* wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
env->set_init_trigger_id(wrap->get_id());
WriteWrap* req_wrap = WriteWrap::New(env,
req_wrap_obj,
this,
AfterWrite,
storage_size);
Local<Value> chunk = chunks->Get(i * 2);

uint32_t bytes = 0;
size_t offset = 0;
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(i * 2);
if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required

// String chunk
Local<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535)
chunk_size = StringBytes::Size(env->isolate(), string, encoding);
else
chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);

// Write buffer
if (Buffer::HasInstance(chunk)) {
storage_size += chunk_size;
}

if (storage_size > INT_MAX)
return UV_ENOBUFS;
} else {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(i);
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
continue;
}

// Write string
offset = ROUND_UP(offset, WriteWrap::kAlignSize);
CHECK_LE(offset, storage_size);
char* str_storage = req_wrap->Extra(offset);
size_t str_size = storage_size - offset;

Local<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
str_size = StringBytes::Write(env->isolate(),
str_storage,
str_size,
string,
encoding);
bufs[i].base = str_storage;
bufs[i].len = str_size;
offset += str_size;
bytes += str_size;
// Try writing immediately without allocation
err = DoTryWrite(&buf_list, &count);
if (err != 0 || count == 0)
goto done;
}

int err = DoWrite(req_wrap, *bufs, count, nullptr);
wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
env->set_init_trigger_id(wrap->get_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);

offset = 0;
if (!all_buffers) {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(i * 2);

// Write buffer
if (Buffer::HasInstance(chunk)) {
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
continue;
}

// Write string
offset = ROUND_UP(offset, WriteWrap::kAlignSize);
CHECK_LE(offset, storage_size);
char* str_storage = req_wrap->Extra(offset);
size_t str_size = storage_size - offset;

Local<String> string = chunk->ToString(env->isolate());
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(i * 2 + 1));
str_size = StringBytes::Write(env->isolate(),
str_storage,
str_size,
string,
encoding);
bufs[i].base = str_storage;
bufs[i].len = str_size;
offset += str_size;
bytes += str_size;
}
}

err = DoWrite(req_wrap, buf_list, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate()));
req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));

if (err)
req_wrap->Dispose();

done:
const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}

if (err)
req_wrap->Dispose();
req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));

return err;
}
Expand Down

0 comments on commit 01a1022

Please sign in to comment.