Skip to content

Commit

Permalink
apacheGH-40783: [C++] Re-order loads and stores in MemoryPoolStats up…
Browse files Browse the repository at this point in the history
…date (apache#40647)

### Rationale for this change

Issue loads as soon as possible so the latency of waiting for memory is masked by doing other operations.

### What changes are included in this PR?

 - Make all the read-modify-write operations use `memory_order_acq_rel`
 - Make all the loads and stores use `memory_order_acquire`/`release` respectively
 - Statically specialize the implementation of `UpdateAllocatedBytes` so `bytes_allocated_` can be updated without waiting for the load of the old value

### Are these changes tested?

By existing tests.

* GitHub Issue: apache#40783

Authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
  • Loading branch information
felipecrv authored Mar 26, 2024
1 parent e75bc99 commit e3b0bd1
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 54 deletions.
12 changes: 6 additions & 6 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class BaseMemoryPoolImpl : public MemoryPool {
}
#endif

stats_.UpdateAllocatedBytes(size);
stats_.DidAllocateBytes(size);
return Status::OK();
}

Expand All @@ -494,7 +494,7 @@ class BaseMemoryPoolImpl : public MemoryPool {
}
#endif

stats_.UpdateAllocatedBytes(new_size - old_size);
stats_.DidReallocateBytes(old_size, new_size);
return Status::OK();
}

Expand All @@ -509,7 +509,7 @@ class BaseMemoryPoolImpl : public MemoryPool {
#endif
Allocator::DeallocateAligned(buffer, size, alignment);

stats_.UpdateAllocatedBytes(-size, /*is_free*/ true);
stats_.DidFreeBytes(size);
}

void ReleaseUnused() override { Allocator::ReleaseUnused(); }
Expand Down Expand Up @@ -761,20 +761,20 @@ class ProxyMemoryPool::ProxyMemoryPoolImpl {

Status Allocate(int64_t size, int64_t alignment, uint8_t** out) {
RETURN_NOT_OK(pool_->Allocate(size, alignment, out));
stats_.UpdateAllocatedBytes(size);
stats_.DidAllocateBytes(size);
return Status::OK();
}

Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) {
RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, alignment, ptr));
stats_.UpdateAllocatedBytes(new_size - old_size);
stats_.DidReallocateBytes(old_size, new_size);
return Status::OK();
}

void Free(uint8_t* buffer, int64_t size, int64_t alignment) {
pool_->Free(buffer, size, alignment);
stats_.UpdateAllocatedBytes(-size, /*is_free=*/true);
stats_.DidFreeBytes(size);
}

int64_t bytes_allocated() const { return stats_.bytes_allocated(); }
Expand Down
82 changes: 53 additions & 29 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,68 @@ namespace internal {
///////////////////////////////////////////////////////////////////////
// Helper tracking memory statistics

class MemoryPoolStats {
public:
MemoryPoolStats() : bytes_allocated_(0), max_memory_(0) {}

int64_t max_memory() const { return max_memory_.load(); }

int64_t bytes_allocated() const { return bytes_allocated_.load(); }
/// \brief Memory pool statistics
///
/// 64-byte aligned so that all atomic values are on the same cache line.
class alignas(64) MemoryPoolStats {
private:
// All atomics are updated according to Acquire-Release ordering.
// https://en.cppreference.com/w/cpp/atomic/memory_order#Release-Acquire_ordering
//
// max_memory_, total_allocated_bytes_, and num_allocs_ only go up (they are
// monotonically increasing) which can allow some optimizations.
std::atomic<int64_t> max_memory_{0};
std::atomic<int64_t> bytes_allocated_{0};
std::atomic<int64_t> total_allocated_bytes_{0};
std::atomic<int64_t> num_allocs_{0};

int64_t total_bytes_allocated() const { return total_allocated_bytes_.load(); }
public:
int64_t max_memory() const { return max_memory_.load(std::memory_order_acquire); }

int64_t num_allocations() const { return num_allocs_.load(); }
int64_t bytes_allocated() const {
return bytes_allocated_.load(std::memory_order_acquire);
}

inline void UpdateAllocatedBytes(int64_t diff, bool is_free = false) {
auto allocated = bytes_allocated_.fetch_add(diff) + diff;
// "maximum" allocated memory is ill-defined in multi-threaded code,
// so don't try to be too rigorous here
if (diff > 0 && allocated > max_memory_) {
max_memory_ = allocated;
}
int64_t total_bytes_allocated() const {
return total_allocated_bytes_.load(std::memory_order_acquire);
}

// Reallocations might just expand/contract the allocation in place or might
// copy to a new location. We can't really know, so we just represent the
// optimistic case.
if (diff > 0) {
total_allocated_bytes_ += diff;
int64_t num_allocations() const { return num_allocs_.load(std::memory_order_acquire); }

inline void DidAllocateBytes(int64_t size) {
// Issue the load before everything else. max_memory_ is monotonically increasing,
// so we can use a relaxed load before the read-modify-write.
auto max_memory = max_memory_.load(std::memory_order_relaxed);
const auto old_bytes_allocated =
bytes_allocated_.fetch_add(size, std::memory_order_acq_rel);
// Issue store operations on values that we don't depend on to proceed
// with execution. When done, max_memory and old_bytes_allocated have
// a higher chance of being available on CPU registers. This also has the
// nice side-effect of putting 3 atomic stores close to each other in the
// instruction stream.
total_allocated_bytes_.fetch_add(size, std::memory_order_acq_rel);
num_allocs_.fetch_add(1, std::memory_order_acq_rel);

// If other threads are updating max_memory_ concurrently we leave the loop without
// updating knowing that it already reached a value even higher than ours.
const auto allocated = old_bytes_allocated + size;
while (max_memory < allocated && !max_memory_.compare_exchange_weak(
/*expected=*/max_memory, /*desired=*/allocated,
std::memory_order_acq_rel)) {
}
}

// We count any reallocation as a allocation.
if (!is_free) {
num_allocs_ += 1;
inline void DidReallocateBytes(int64_t old_size, int64_t new_size) {
if (new_size > old_size) {
DidAllocateBytes(new_size - old_size);
} else {
DidFreeBytes(old_size - new_size);
}
}

protected:
std::atomic<int64_t> bytes_allocated_ = 0;
std::atomic<int64_t> max_memory_ = 0;
std::atomic<int64_t> total_allocated_bytes_ = 0;
std::atomic<int64_t> num_allocs_ = 0;
inline void DidFreeBytes(int64_t size) {
bytes_allocated_.fetch_sub(size, std::memory_order_acq_rel);
}
};

} // namespace internal
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/memory_pool_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,12 @@ static void AllocateTouchDeallocate(
state.SetBytesProcessed(state.iterations() * nbytes);
}

#define BENCHMARK_ALLOCATE_ARGS \
->RangeMultiplier(16)->Range(4096, 16 * 1024 * 1024)->ArgName("size")->UseRealTime()
#define BENCHMARK_ALLOCATE_ARGS \
->RangeMultiplier(16) \
->Range(4096, 16 * 1024 * 1024) \
->ArgName("size") \
->UseRealTime() \
->ThreadRange(1, 32)

#define BENCHMARK_ALLOCATE(benchmark_func, template_param) \
BENCHMARK_TEMPLATE(benchmark_func, template_param) BENCHMARK_ALLOCATE_ARGS
Expand Down
11 changes: 2 additions & 9 deletions cpp/src/arrow/memory_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,6 @@ TEST(DefaultMemoryPool, Identity) {
specific_pools.end());
}

// Death tests and valgrind are known to not play well 100% of the time. See
// googletest documentation
#if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER))

// TODO: is this still a death test?
TEST(DefaultMemoryPoolDeathTest, Statistics) {
MemoryPool* pool = default_memory_pool();
uint8_t* data1;
Expand All @@ -137,18 +132,16 @@ TEST(DefaultMemoryPoolDeathTest, Statistics) {
ASSERT_EQ(150, pool->max_memory());
ASSERT_EQ(200, pool->total_bytes_allocated());
ASSERT_EQ(50, pool->bytes_allocated());
ASSERT_EQ(4, pool->num_allocations());
ASSERT_EQ(3, pool->num_allocations());

pool->Free(data1, 50);

ASSERT_EQ(150, pool->max_memory());
ASSERT_EQ(200, pool->total_bytes_allocated());
ASSERT_EQ(0, pool->bytes_allocated());
ASSERT_EQ(4, pool->num_allocations());
ASSERT_EQ(3, pool->num_allocations());
}

#endif // ARROW_VALGRIND

TEST(LoggingMemoryPool, Logging) {
auto pool = MemoryPool::CreateDefault();

Expand Down
9 changes: 5 additions & 4 deletions cpp/src/arrow/memory_pool_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@ class TestMemoryPoolBase : public ::testing::Test {
auto pool = memory_pool();

uint8_t* data;
const auto old_bytes_allocated = pool->bytes_allocated();
ASSERT_OK(pool->Allocate(100, &data));
EXPECT_EQ(static_cast<uint64_t>(0), reinterpret_cast<uint64_t>(data) % 64);
ASSERT_EQ(100, pool->bytes_allocated());
ASSERT_EQ(old_bytes_allocated + 100, pool->bytes_allocated());

uint8_t* data2;
ASSERT_OK(pool->Allocate(27, &data2));
EXPECT_EQ(static_cast<uint64_t>(0), reinterpret_cast<uint64_t>(data2) % 64);
ASSERT_EQ(127, pool->bytes_allocated());
ASSERT_EQ(old_bytes_allocated + 127, pool->bytes_allocated());

pool->Free(data, 100);
ASSERT_EQ(27, pool->bytes_allocated());
ASSERT_EQ(old_bytes_allocated + 27, pool->bytes_allocated());
pool->Free(data2, 27);
ASSERT_EQ(0, pool->bytes_allocated());
ASSERT_EQ(old_bytes_allocated, pool->bytes_allocated());
}

void TestOOM() {
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/stl_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class STLMemoryPool : public MemoryPool {
} catch (std::bad_alloc& e) {
return Status::OutOfMemory(e.what());
}
stats_.UpdateAllocatedBytes(size);
stats_.DidAllocateBytes(size);
return Status::OK();
}

Expand All @@ -124,13 +124,13 @@ class STLMemoryPool : public MemoryPool {
}
memcpy(*ptr, old_ptr, std::min(old_size, new_size));
alloc_.deallocate(old_ptr, old_size);
stats_.UpdateAllocatedBytes(new_size - old_size);
stats_.DidReallocateBytes(old_size, new_size);
return Status::OK();
}

void Free(uint8_t* buffer, int64_t size, int64_t /*alignment*/) override {
alloc_.deallocate(buffer, size);
stats_.UpdateAllocatedBytes(-size, /*is_free=*/true);
stats_.DidFreeBytes(size);
}

int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }
Expand Down
6 changes: 5 additions & 1 deletion java/dataset/src/main/cpp/jni_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ class ReservationListenableMemoryPool::Impl {

int64_t Reserve(int64_t diff) {
std::lock_guard<std::mutex> lock(mutex_);
stats_.UpdateAllocatedBytes(diff);
if (diff > 0) {
stats_.DidAllocateBytes(diff);
} else if (diff < 0) {
stats_.DidFreeBytes(-diff);
}
int64_t new_block_count;
int64_t bytes_reserved = stats_.bytes_allocated();
if (bytes_reserved == 0) {
Expand Down

0 comments on commit e3b0bd1

Please sign in to comment.