Skip to content

Commit

Permalink
ARROW-15579: [C++] Add MemoryManager::CopyBuffer(const Buffer&)
Browse files Browse the repository at this point in the history
This is useful to implement, for instance, a copy from a void* (from an external API) to a buffer on a different device (the current API would require allocating a shared_ptr, and provides no way to get back a unique_ptr). Note that while we have the BufferWriter API, that also fails (because it needs ownership of the buffer to write to, and there isn't a good way to express "a temporary writer that does not have ownership of the target buffer").

Closes apache#12341 from lidavidm/arrow-15579

Lead-authored-by: David Li <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
lidavidm and pitrou committed Feb 17, 2022
1 parent 17fcbdb commit d94365f
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 28 deletions.
5 changes: 5 additions & 0 deletions cpp/src/arrow/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ Result<std::shared_ptr<Buffer>> Buffer::Copy(std::shared_ptr<Buffer> source,
return MemoryManager::CopyBuffer(source, to);
}

Result<std::unique_ptr<Buffer>> Buffer::CopyNonOwned(
const Buffer& source, const std::shared_ptr<MemoryManager>& to) {
return MemoryManager::CopyNonOwned(source, to);
}

Result<std::shared_ptr<Buffer>> Buffer::View(std::shared_ptr<Buffer> source,
const std::shared_ptr<MemoryManager>& to) {
return MemoryManager::ViewBuffer(source, to);
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ class ARROW_EXPORT Buffer {
static Result<std::shared_ptr<Buffer>> Copy(std::shared_ptr<Buffer> source,
const std::shared_ptr<MemoryManager>& to);

/// \brief Copy a non-owned buffer
///
/// This is useful for cases where the source memory area is externally managed
/// (its lifetime not tied to the source Buffer), otherwise please use Copy().
static Result<std::unique_ptr<Buffer>> CopyNonOwned(
const Buffer& source, const std::shared_ptr<MemoryManager>& to);

/// \brief View buffer
///
/// Return a Buffer that reflects this buffer, seen potentially from another
Expand Down
50 changes: 44 additions & 6 deletions cpp/src/arrow/buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/make_unique.h"

namespace arrow {

Expand Down Expand Up @@ -115,6 +116,10 @@ class MyMemoryManager : public MemoryManager {
Result<std::shared_ptr<Buffer>> CopyBufferTo(
const std::shared_ptr<Buffer>& buf,
const std::shared_ptr<MemoryManager>& to) override;
Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
const Buffer& buf, const std::shared_ptr<MemoryManager>& from) override;
Result<std::unique_ptr<Buffer>> CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) override;
Result<std::shared_ptr<Buffer>> ViewBufferFrom(
const std::shared_ptr<Buffer>& buf,
const std::shared_ptr<MemoryManager>& from) override;
Expand All @@ -138,28 +143,38 @@ std::shared_ptr<MemoryManager> MyDevice::default_memory_manager() {

Result<std::shared_ptr<Buffer>> MyMemoryManager::CopyBufferFrom(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
return CopyNonOwnedFrom(*buf, from);
}

Result<std::shared_ptr<Buffer>> MyMemoryManager::CopyBufferTo(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
return CopyNonOwnedTo(*buf, to);
}

Result<std::unique_ptr<Buffer>> MyMemoryManager::CopyNonOwnedFrom(
const Buffer& buf, const std::shared_ptr<MemoryManager>& from) {
if (!allow_copy()) {
return nullptr;
}
if (from->is_cpu()) {
// CPU to MyDevice:
// 1. CPU to CPU
ARROW_ASSIGN_OR_RAISE(auto dest,
MemoryManager::CopyBuffer(buf, default_cpu_memory_manager()));
MemoryManager::CopyNonOwned(buf, default_cpu_memory_manager()));
// 2. Wrap CPU buffer result
return std::make_shared<MyBuffer>(shared_from_this(), dest);
return internal::make_unique<MyBuffer>(shared_from_this(), std::move(dest));
}
return nullptr;
}

Result<std::shared_ptr<Buffer>> MyMemoryManager::CopyBufferTo(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
Result<std::unique_ptr<Buffer>> MyMemoryManager::CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) {
if (!allow_copy()) {
return nullptr;
}
if (to->is_cpu() && buf->parent()) {
if (to->is_cpu() && buf.parent()) {
// MyDevice to CPU
return MemoryManager::CopyBuffer(buf->parent(), to);
return MemoryManager::CopyNonOwned(*buf.parent(), to);
}
return nullptr;
}
Expand Down Expand Up @@ -245,6 +260,13 @@ TEST_F(TestDevice, Copy) {
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyNonOwned(*cpu_src_, cpu_mm_));
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

// CPU-to-device
ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(cpu_src_, my_copy_mm_));
ASSERT_EQ(buffer->device(), my_copy_device_);
Expand All @@ -255,6 +277,15 @@ TEST_F(TestDevice, Copy) {
#endif
AssertMyBufferEqual(*buffer, "some data");

ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyNonOwned(*cpu_src_, my_copy_mm_));
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
AssertMyBufferEqual(*buffer, "some data");

// Device-to-CPU
ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(my_copy_src_, cpu_mm_));
ASSERT_EQ(buffer->device(), cpu_device_);
Expand All @@ -263,6 +294,13 @@ TEST_F(TestDevice, Copy) {
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyNonOwned(*my_copy_src_, cpu_mm_));
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

// Device-to-device with an intermediate CPU copy
ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyBuffer(my_copy_src_, my_copy_mm_));
ASSERT_EQ(buffer->device(), my_copy_device_);
Expand Down
45 changes: 39 additions & 6 deletions cpp/src/arrow/device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ Result<std::shared_ptr<Buffer>> MemoryManager::CopyBuffer(
" to ", to->device()->ToString(), " not supported");
}

Result<std::unique_ptr<Buffer>> MemoryManager::CopyNonOwned(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) {
const auto& from = buf.memory_manager();
auto maybe_buffer = to->CopyNonOwnedFrom(buf, from);
COPY_BUFFER_RETURN(maybe_buffer, to);
// `to` doesn't support copying from `from`, try the other way
maybe_buffer = from->CopyNonOwnedTo(buf, to);
COPY_BUFFER_RETURN(maybe_buffer, to);

return Status::NotImplemented("Copying buffer from ", from->device()->ToString(),
" to ", to->device()->ToString(), " not supported");
}

Result<std::shared_ptr<Buffer>> MemoryManager::ViewBuffer(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
if (buf->memory_manager() == to) {
Expand Down Expand Up @@ -103,6 +116,16 @@ Result<std::shared_ptr<Buffer>> MemoryManager::CopyBufferTo(
return nullptr;
}

Result<std::unique_ptr<Buffer>> MemoryManager::CopyNonOwnedFrom(
const Buffer& buf, const std::shared_ptr<MemoryManager>& from) {
return nullptr;
}

Result<std::unique_ptr<Buffer>> MemoryManager::CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) {
return nullptr;
}

Result<std::shared_ptr<Buffer>> MemoryManager::ViewBufferFrom(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
return nullptr;
Expand Down Expand Up @@ -141,12 +164,17 @@ Result<std::unique_ptr<Buffer>> CPUMemoryManager::AllocateBuffer(int64_t size) {

Result<std::shared_ptr<Buffer>> CPUMemoryManager::CopyBufferFrom(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
return CopyNonOwnedFrom(*buf, from);
}

Result<std::unique_ptr<Buffer>> CPUMemoryManager::CopyNonOwnedFrom(
const Buffer& buf, const std::shared_ptr<MemoryManager>& from) {
if (!from->is_cpu()) {
return nullptr;
}
ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf->size(), pool_));
if (buf->size() > 0) {
memcpy(dest->mutable_data(), buf->data(), static_cast<size_t>(buf->size()));
ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf.size(), pool_));
if (buf.size() > 0) {
memcpy(dest->mutable_data(), buf.data(), static_cast<size_t>(buf.size()));
}
return std::move(dest);
}
Expand All @@ -161,12 +189,17 @@ Result<std::shared_ptr<Buffer>> CPUMemoryManager::ViewBufferFrom(

Result<std::shared_ptr<Buffer>> CPUMemoryManager::CopyBufferTo(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
return CopyNonOwnedTo(*buf, to);
}

Result<std::unique_ptr<Buffer>> CPUMemoryManager::CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) {
if (!to->is_cpu()) {
return nullptr;
}
ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf->size(), pool_));
if (buf->size() > 0) {
memcpy(dest->mutable_data(), buf->data(), static_cast<size_t>(buf->size()));
ARROW_ASSIGN_OR_RAISE(auto dest, ::arrow::AllocateBuffer(buf.size(), pool_));
if (buf.size() > 0) {
memcpy(dest->mutable_data(), buf.data(), static_cast<size_t>(buf.size()));
}
return std::move(dest);
}
Expand Down
16 changes: 15 additions & 1 deletion cpp/src/arrow/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,19 @@ class ARROW_EXPORT MemoryManager : public std::enable_shared_from_this<MemoryMan
/// The buffer will be allocated in the device's memory.
virtual Result<std::unique_ptr<Buffer>> AllocateBuffer(int64_t size) = 0;

// XXX Should this take a `const Buffer&` instead
/// \brief Copy a Buffer to a destination MemoryManager
///
/// See also the Buffer::Copy shorthand.
static Result<std::shared_ptr<Buffer>> CopyBuffer(
const std::shared_ptr<Buffer>& source, const std::shared_ptr<MemoryManager>& to);

/// \brief Copy a non-owned Buffer to a destination MemoryManager
///
/// This is useful for cases where the source memory area is externally managed
/// (its lifetime not tied to the source Buffer), otherwise please use CopyBuffer().
static Result<std::unique_ptr<Buffer>> CopyNonOwned(
const Buffer& source, const std::shared_ptr<MemoryManager>& to);

/// \brief Make a no-copy Buffer view in a destination MemoryManager
///
/// See also the Buffer::View shorthand.
Expand All @@ -146,6 +152,10 @@ class ARROW_EXPORT MemoryManager : public std::enable_shared_from_this<MemoryMan
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from);
virtual Result<std::shared_ptr<Buffer>> CopyBufferTo(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to);
virtual Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
const Buffer& buf, const std::shared_ptr<MemoryManager>& from);
virtual Result<std::unique_ptr<Buffer>> CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to);
virtual Result<std::shared_ptr<Buffer>> ViewBufferFrom(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from);
virtual Result<std::shared_ptr<Buffer>> ViewBufferTo(
Expand Down Expand Up @@ -202,6 +212,10 @@ class ARROW_EXPORT CPUMemoryManager : public MemoryManager {
Result<std::shared_ptr<Buffer>> CopyBufferTo(
const std::shared_ptr<Buffer>& buf,
const std::shared_ptr<MemoryManager>& to) override;
Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
const Buffer& buf, const std::shared_ptr<MemoryManager>& from) override;
Result<std::unique_ptr<Buffer>> CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) override;
Result<std::shared_ptr<Buffer>> ViewBufferFrom(
const std::shared_ptr<Buffer>& buf,
const std::shared_ptr<MemoryManager>& from) override;
Expand Down
36 changes: 22 additions & 14 deletions cpp/src/arrow/gpu/cuda_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,14 @@ Result<std::shared_ptr<io::RandomAccessFile>> CudaMemoryManager::GetBufferReader
"for device ",
buf->device()->ToString());
}
return std::make_shared<CudaBufferReader>(checked_pointer_cast<CudaBuffer>(buf));
return std::make_shared<CudaBufferReader>(buf);
}

Result<std::shared_ptr<io::OutputStream>> CudaMemoryManager::GetBufferWriter(
std::shared_ptr<Buffer> buf) {
if (*buf->device() != *device_) {
return Status::Invalid(
"CudaMemoryManager::GetBufferReader called on foreign buffer "
"CudaMemoryManager::GetBufferWriter called on foreign buffer "
"for device ",
buf->device()->ToString());
}
Expand All @@ -327,27 +327,36 @@ Result<std::unique_ptr<Buffer>> CudaMemoryManager::AllocateBuffer(int64_t size)

Result<std::shared_ptr<Buffer>> CudaMemoryManager::CopyBufferTo(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& to) {
return CopyNonOwnedTo(*buf, to);
}

Result<std::unique_ptr<Buffer>> CudaMemoryManager::CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) {
if (to->is_cpu()) {
// Device-to-CPU copy
std::shared_ptr<Buffer> dest;
std::unique_ptr<Buffer> dest;
ARROW_ASSIGN_OR_RAISE(auto from_context, cuda_device()->GetContext());
ARROW_ASSIGN_OR_RAISE(dest, to->AllocateBuffer(buf->size()));
RETURN_NOT_OK(from_context->CopyDeviceToHost(dest->mutable_data(), buf->address(),
buf->size()));
ARROW_ASSIGN_OR_RAISE(dest, to->AllocateBuffer(buf.size()));
RETURN_NOT_OK(
from_context->CopyDeviceToHost(dest->mutable_data(), buf.address(), buf.size()));
return dest;
}
return nullptr;
}

Result<std::shared_ptr<Buffer>> CudaMemoryManager::CopyBufferFrom(
const std::shared_ptr<Buffer>& buf, const std::shared_ptr<MemoryManager>& from) {
// TODO: remove these or just make them base class
return CopyNonOwnedFrom(*buf, from);
}

Result<std::unique_ptr<Buffer>> CudaMemoryManager::CopyNonOwnedFrom(
const Buffer& buf, const std::shared_ptr<MemoryManager>& from) {
if (from->is_cpu()) {
// CPU-to-device copy
ARROW_ASSIGN_OR_RAISE(auto to_context, cuda_device()->GetContext());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> dest,
to_context->Allocate(buf->size()));
RETURN_NOT_OK(
to_context->CopyHostToDevice(dest->address(), buf->data(), buf->size()));
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> dest, to_context->Allocate(buf.size()));
RETURN_NOT_OK(to_context->CopyHostToDevice(dest->address(), buf.data(), buf.size()));
return dest;
}
if (IsCudaMemoryManager(*from)) {
Expand All @@ -356,16 +365,15 @@ Result<std::shared_ptr<Buffer>> CudaMemoryManager::CopyBufferFrom(
ARROW_ASSIGN_OR_RAISE(
auto from_context,
checked_cast<const CudaMemoryManager&>(*from).cuda_device()->GetContext());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> dest,
to_context->Allocate(buf->size()));
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> dest, to_context->Allocate(buf.size()));
if (to_context->handle() == from_context->handle()) {
// Same context
RETURN_NOT_OK(
to_context->CopyDeviceToDevice(dest->address(), buf->address(), buf->size()));
to_context->CopyDeviceToDevice(dest->address(), buf.address(), buf.size()));
} else {
// Other context
RETURN_NOT_OK(from_context->CopyDeviceToAnotherDevice(to_context, dest->address(),
buf->address(), buf->size()));
buf.address(), buf.size()));
}
return dest;
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ class ARROW_EXPORT CudaMemoryManager : public MemoryManager {
Result<std::shared_ptr<Buffer>> CopyBufferTo(
const std::shared_ptr<Buffer>& buf,
const std::shared_ptr<MemoryManager>& to) override;
Result<std::unique_ptr<Buffer>> CopyNonOwnedFrom(
const Buffer& buf, const std::shared_ptr<MemoryManager>& from) override;
Result<std::unique_ptr<Buffer>> CopyNonOwnedTo(
const Buffer& buf, const std::shared_ptr<MemoryManager>& to) override;
Result<std::shared_ptr<Buffer>> ViewBufferFrom(
const std::shared_ptr<Buffer>& buf,
const std::shared_ptr<MemoryManager>& from) override;
Expand Down
19 changes: 18 additions & 1 deletion cpp/src/arrow/gpu/cuda_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,35 @@ TEST_F(TestCudaDevice, Copy) {
ASSERT_EQ(other_buffer->device(), device_);
AssertCudaBufferEquals(*other_buffer, "some data");

// Copy non-owned
ASSERT_OK_AND_ASSIGN(other_buffer, Buffer::CopyNonOwned(*cpu_buffer, mm_));
ASSERT_EQ(other_buffer->device(), device_);
AssertCudaBufferEquals(*other_buffer, "some data");

// device -> CPU
ASSERT_OK_AND_ASSIGN(cpu_buffer, Buffer::Copy(other_buffer, cpu_mm_));
ASSERT_TRUE(cpu_buffer->device()->is_cpu());
AssertBufferEqual(*cpu_buffer, "some data");

// Copy non-owned
ASSERT_OK_AND_ASSIGN(cpu_buffer, Buffer::CopyNonOwned(*other_buffer, cpu_mm_));
ASSERT_TRUE(cpu_buffer->device()->is_cpu());
AssertBufferEqual(*cpu_buffer, "some data");

// device -> device
const auto old_address = other_buffer->address();
auto old_address = other_buffer->address();
ASSERT_OK_AND_ASSIGN(other_buffer, Buffer::Copy(other_buffer, mm_));
ASSERT_EQ(other_buffer->device(), device_);
ASSERT_NE(other_buffer->address(), old_address);
AssertCudaBufferEquals(*other_buffer, "some data");

// Copy non-owned
old_address = other_buffer->address();
ASSERT_OK_AND_ASSIGN(other_buffer, Buffer::CopyNonOwned(*other_buffer, mm_));
ASSERT_EQ(other_buffer->device(), device_);
ASSERT_NE(other_buffer->address(), old_address);
AssertCudaBufferEquals(*other_buffer, "some data");

// device (other context) -> device
ASSERT_OK_AND_ASSIGN(auto other_context, NonPrimaryContext());
ASSERT_OK_AND_ASSIGN(std::shared_ptr<CudaBuffer> cuda_buffer,
Expand Down

0 comments on commit d94365f

Please sign in to comment.