Skip to content

Commit

Permalink
parallelize numbuf memcpy and plasma object hash construction (ray-pr…
Browse files Browse the repository at this point in the history
…oject#366)

* parallelizing memcopy and object hash construction in numbuf/plasma

* clang format

* whitespace

* refactoring compute object hash: get rid of the prefix chunk

* clang format

* Document performance optimization.

* Remove check for 64-byte alignment, since it may not be guaranteed.
  • Loading branch information
atumanov authored and robertnishihara committed Mar 21, 2017
1 parent ba02fc0 commit a3d5860
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 21 deletions.
8 changes: 4 additions & 4 deletions src/numbuf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ if(APPLE)
endif(APPLE)

if(APPLE)
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES})
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread)
else()
target_link_libraries(numbuf -Wl,--whole-archive ${ARROW_LIB} -Wl,--no-whole-archive ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES})
target_link_libraries(numbuf -Wl,--whole-archive ${ARROW_LIB} -Wl,--no-whole-archive ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread)
endif()

if(HAS_PLASMA)
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} plasma_lib common ${FLATBUFFERS_STATIC_LIB})
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} plasma_lib common ${FLATBUFFERS_STATIC_LIB} -lpthread)
else()
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES})
target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread)
endif()

install(TARGETS numbuf DESTINATION ${CMAKE_SOURCE_DIR}/numbuf/)
103 changes: 101 additions & 2 deletions src/numbuf/python/src/pynumbuf/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,113 @@

#include <arrow/io/interfaces.h>

/* C++ includes */
#include <string>
#include <thread>
#include <vector>

#define THREADPOOL_SIZE 8
#define MEMCOPY_BLOCK_SIZE 64
#define BYTES_IN_MB (1 << 20)

using namespace std;

namespace numbuf {

class ParallelMemcopy {
public:
explicit ParallelMemcopy(uint64_t block_size, bool timeit, int threadpool_size)
: timeit_(timeit),
threadpool_(threadpool_size),
block_size_(block_size),
threadpool_size_(threadpool_size) {}

void memcopy(uint8_t* dst, const uint8_t* src, uint64_t nbytes) {
struct timeval tv1, tv2;
if (timeit_) {
// Start the timer.
gettimeofday(&tv1, NULL);
}
if (nbytes >= BYTES_IN_MB) {
memcopy_aligned(dst, src, nbytes, block_size_);
} else {
memcpy(dst, src, nbytes);
}
if (timeit_) {
// Stop the timer and log the measured time.
gettimeofday(&tv2, NULL);
double elapsed =
((tv2.tv_sec - tv1.tv_sec) * 1000000 + (tv2.tv_usec - tv1.tv_usec)) / 1000000.0;
// TODO: replace this with ARROW_LOG(ARROW_INFO) or better equivalent.
printf("Copied %llu bytes in time = %8.4f MBps = %8.4f\n", nbytes, elapsed,
nbytes / (BYTES_IN_MB * elapsed));
}
}

~ParallelMemcopy() {
// Join threadpool threads just in case they are still running.
for (auto& t : threadpool_) {
if (t.joinable()) { t.join(); }
}
}

private:
/** Controls whether the memcopy operations are timed. */
bool timeit_;
/** Specifies the desired alignment in bytes, as a power of 2. */
uint64_t block_size_;
/** Number of threads to be used for parallel memcopy operations. */
int threadpool_size_;
/** Internal threadpool to be used in the fork/join pattern. */
std::vector<std::thread> threadpool_;

void memcopy_aligned(
uint8_t* dst, const uint8_t* src, uint64_t nbytes, uint64_t block_size) {
uint64_t num_threads = threadpool_size_;
uint64_t dst_address = reinterpret_cast<uint64_t>(dst);
uint64_t src_address = reinterpret_cast<uint64_t>(src);
uint64_t left_address = (src_address + block_size - 1) & ~(block_size - 1);
uint64_t right_address = (src_address + nbytes) & ~(block_size - 1);
uint64_t num_blocks = (right_address - left_address) / block_size;
// Update right address
right_address = right_address - (num_blocks % num_threads) * block_size;
// Now we divide these blocks between available threads. The remainder is
// handled on the main thread.

uint64_t chunk_size = (right_address - left_address) / num_threads;
uint64_t prefix = left_address - src_address;
uint64_t suffix = src_address + nbytes - right_address;
// Now the data layout is | prefix | k * num_threads * block_size | suffix |.
// We have chunk_size = k * block_size, therefore the data layout is
// | prefix | num_threads * chunk_size | suffix |.
// Each thread gets a "chunk" of k blocks.

// Start all threads first and handle leftovers while threads run.
for (int i = 0; i < num_threads; i++) {
threadpool_[i] = std::thread(memcpy, dst + prefix + i * chunk_size,
reinterpret_cast<uint8_t*>(left_address) + i * chunk_size, chunk_size);
}

memcpy(dst, src, prefix);
memcpy(dst + prefix + num_threads * chunk_size,
reinterpret_cast<uint8_t*>(right_address), suffix);

for (auto& t : threadpool_) {
if (t.joinable()) { t.join(); }
}
}
};

class FixedBufferStream : public arrow::io::OutputStream,
public arrow::io::ReadableFileInterface {
public:
virtual ~FixedBufferStream() {}

explicit FixedBufferStream(uint8_t* data, int64_t nbytes)
: data_(data), position_(0), size_(nbytes) {}
: data_(data),
position_(0),
size_(nbytes),
memcopy_helper(MEMCOPY_BLOCK_SIZE, false, THREADPOOL_SIZE) {}

arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override {
DCHECK(out);
Expand Down Expand Up @@ -44,7 +142,7 @@ class FixedBufferStream : public arrow::io::OutputStream,
DCHECK(position_ + nbytes <= size_) << "position: " << position_
<< " nbytes: " << nbytes << "size: " << size_;
uint8_t* dst = data_ + position_;
memcpy(dst, data, nbytes);
memcopy_helper.memcopy(dst, data, nbytes);
position_ += nbytes;
return arrow::Status::OK();
}
Expand All @@ -60,6 +158,7 @@ class FixedBufferStream : public arrow::io::OutputStream,
uint8_t* data_;
int64_t position_;
int64_t size_;
ParallelMemcopy memcopy_helper;
};

class MockBufferStream : public arrow::io::OutputStream {
Expand Down
6 changes: 3 additions & 3 deletions src/plasma/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ if(APPLE)
endif(APPLE)

if(APPLE)
target_link_libraries(plasma -Wl,-force_load,${FLATBUFFERS_STATIC_LIB} common ${PYTHON_LIBRARIES} ${FLATBUFFERS_STATIC_LIB})
target_link_libraries(plasma -Wl,-force_load,${FLATBUFFERS_STATIC_LIB} common ${PYTHON_LIBRARIES} ${FLATBUFFERS_STATIC_LIB} -lpthread)
else(APPLE)
target_link_libraries(plasma -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive common ${PYTHON_LIBRARIES} ${FLATBUFFERS_STATIC_LIB})
target_link_libraries(plasma -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive common ${PYTHON_LIBRARIES} ${FLATBUFFERS_STATIC_LIB} -lpthread)
endif(APPLE)

include_directories("${FLATBUFFERS_INCLUDE_DIR}")
Expand Down Expand Up @@ -87,7 +87,7 @@ add_library(plasma_lib STATIC
fling.c
thirdparty/xxhash.c)

target_link_libraries(plasma_lib common ${FLATBUFFERS_STATIC_LIB})
target_link_libraries(plasma_lib common ${FLATBUFFERS_STATIC_LIB} -lpthread)
add_dependencies(plasma_lib gen_plasma_fbs)

add_dependencies(plasma protocol_fbs)
Expand Down
3 changes: 3 additions & 0 deletions src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#include "utarray.h"
#include "uthash.h"

/** Allocation granularity used in plasma for object allocation. */
#define BLOCK_SIZE 64

/**
* Object request data structure. Used in the plasma_wait_for_objects()
* argument.
Expand Down
82 changes: 72 additions & 10 deletions src/plasma/plasma_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#include "uthash.h"
#include "utlist.h"

/* C++ includes */
#include <vector>
#include <thread>

extern "C" {
#include "sha256.h"
#include "fling.h"
Expand All @@ -38,6 +42,10 @@ extern "C" {
#define XXH64_DEFAULT_SEED 0
}

#define THREADPOOL_SIZE 8
#define BYTES_IN_MB (1 << 20)
static std::vector<std::thread> threadpool_(THREADPOOL_SIZE);

typedef struct {
/** Key that uniquely identifies the memory mapped file. In practice, we
* take the numerical value of the file descriptor in the object store. */
Expand Down Expand Up @@ -465,28 +473,82 @@ void plasma_contains(PlasmaConnection *conn, ObjectID obj_id, int *has_object) {
}
}

static void compute_block_hash(const unsigned char *data,
int64_t nbytes,
uint64_t *hash) {
XXH64_state_t hash_state;
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
XXH64_update(&hash_state, data, nbytes);
*hash = XXH64_digest(&hash_state);
}

static inline bool compute_object_hash_parallel(XXH64_state_t *hash_state,
const unsigned char *data,
int64_t nbytes) {
/* Note that this function will likely be faster if the address of data is
* aligned on a 64-byte boundary. */
const uint64_t num_threads = THREADPOOL_SIZE;
uint64_t threadhash[num_threads + 1];
const uint64_t data_address = reinterpret_cast<uint64_t>(data);
const uint64_t num_blocks = nbytes / BLOCK_SIZE;
const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE;
const uint64_t right_address = data_address + chunk_size * num_threads;
const uint64_t suffix = (data_address + nbytes) - right_address;
/* Now the data layout is | k * num_threads * block_size | suffix | ==
* | num_threads * chunk_size | suffix |, where chunk_size = k * block_size.
* Each thread gets a "chunk" of k blocks, except the suffix thread. */

for (int i = 0; i < num_threads; i++) {
threadpool_[i] =
std::thread(compute_block_hash,
reinterpret_cast<uint8_t *>(data_address) + i * chunk_size,
chunk_size, &threadhash[i]);
}
compute_block_hash(reinterpret_cast<uint8_t *>(right_address), suffix,
&threadhash[num_threads]);

/* Join the threads. */
for (auto &t : threadpool_) {
if (t.joinable()) {
t.join();
}
}

XXH64_update(hash_state, (unsigned char *) threadhash, sizeof(threadhash));
return true;
}

static uint64_t compute_object_hash(const ObjectBuffer &obj_buffer) {
XXH64_state_t hash_state;
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
if (obj_buffer.data_size >= BYTES_IN_MB) {
compute_object_hash_parallel(&hash_state, (unsigned char *) obj_buffer.data,
obj_buffer.data_size);
} else {
XXH64_update(&hash_state, (unsigned char *) obj_buffer.data,
obj_buffer.data_size);
}
XXH64_update(&hash_state, (unsigned char *) obj_buffer.metadata,
obj_buffer.metadata_size);
return XXH64_digest(&hash_state);
}

bool plasma_compute_object_hash(PlasmaConnection *conn,
ObjectID obj_id,
unsigned char *digest) {
/* Get the plasma object data. We pass in a timeout of 0 to indicate that
* the operation should timeout immediately. */
ObjectBuffer obj_buffer;
ObjectID obj_id_array[1] = {obj_id};
uint64_t hash;

plasma_get(conn, obj_id_array, 1, 0, &obj_buffer);
/* If the object was not retrieved, return false. */
if (obj_buffer.data_size == -1) {
return false;
}
/* Compute the hash. */
XXH64_state_t hash_state;
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
XXH64_update(&hash_state, (unsigned char *) obj_buffer.data,
obj_buffer.data_size);
XXH64_update(&hash_state, (unsigned char *) obj_buffer.metadata,
obj_buffer.metadata_size);
uint64_t hash = XXH64_digest(&hash_state);
DCHECK(DIGEST_SIZE >= sizeof(hash));
memset(digest, 0, DIGEST_SIZE);
hash = compute_object_hash(obj_buffer);
memcpy(digest, &hash, sizeof(hash));
/* Release the plasma object. */
plasma_release(conn, obj_id);
Expand All @@ -505,7 +567,7 @@ void plasma_seal(PlasmaConnection *conn, ObjectID object_id) {
"Plasma client called seal an already sealed object");
object_entry->is_sealed = true;
/* Send the seal request to Plasma. */
unsigned char digest[DIGEST_SIZE];
static unsigned char digest[DIGEST_SIZE];
CHECK(plasma_compute_object_hash(conn, object_id, &digest[0]));
CHECK(plasma_send_SealRequest(conn->store_conn, conn->builder, object_id,
&digest[0]) >= 0);
Expand Down
12 changes: 10 additions & 2 deletions src/plasma/plasma_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern "C" {
#include "fling.h"
#include "malloc.h"
void *dlmalloc(size_t);
void *dlmemalign(size_t alignment, size_t bytes);
void dlfree(void *);
}

Expand Down Expand Up @@ -230,8 +231,15 @@ int create_object(Client *client_context,
if (!success) {
return PlasmaError_OutOfMemory;
}
/* Allocate space for the new object */
uint8_t *pointer = (uint8_t *) dlmalloc(data_size + metadata_size);
/* Allocate space for the new object. We use dlmemalign instead of dlmalloc in
* order to align the allocated region to a 64-byte boundary. This is not
* strictly necessary, but it is an optimization that could speed up the
* computation of a hash of the data (see compute_object_hash_parallel in
* plasma_client.cc). Note that even though this pointer is 64-byte aligned,
* it is not guaranteed that the corresponding pointer in the client will be
* 64-byte aligned, but in practice it often will be. */
uint8_t *pointer =
(uint8_t *) dlmemalign(BLOCK_SIZE, data_size + metadata_size);
int fd;
int64_t map_size;
ptrdiff_t offset;
Expand Down

0 comments on commit a3d5860

Please sign in to comment.