Skip to content

Commit

Permalink
[rpc] remove last vestiges of chromium Atomics from RPC
Browse files Browse the repository at this point in the history
Change-Id: I6039b4a08615339c8f06a4d215a0b1058a9bacea
Reviewed-on: http://gerrit.cloudera.org:8080/21513
Tested-by: Kudu Jenkins
Reviewed-by: Marton Greber <[email protected]>
  • Loading branch information
alexeyserbin committed Jun 14, 2024
1 parent 3e43ae9 commit 5405d06
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 25 deletions.
6 changes: 3 additions & 3 deletions src/kudu/rpc/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ void Proxy::AsyncRequest(const string& method,
RpcController* controller,
const ResponseCallback& callback) {
DCHECK(!controller->call_) << "Controller should be reset";
base::subtle::NoBarrier_Store(&is_started_, true);
is_started_.store(true, std::memory_order_relaxed);
// TODO(awong): it would be great if we didn't have to heap allocate the
// payload.
auto req_payload = RequestPayload::CreateRequestPayload(
Expand Down Expand Up @@ -269,13 +269,13 @@ Status Proxy::SyncRequest(const string& method,
}

void Proxy::set_user_credentials(UserCredentials user_credentials) {
DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
DCHECK(is_started_.load(std::memory_order_relaxed) == false)
<< "illegal to call set_user_credentials() after request processing has started";
conn_id_.set_user_credentials(std::move(user_credentials));
}

void Proxy::set_network_plane(string network_plane) {
DCHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
DCHECK(is_started_.load(std::memory_order_relaxed) == false)
<< "illegal to call set_network_plane() after request processing has started";
conn_id_.set_network_plane(std::move(network_plane));
}
Expand Down
4 changes: 2 additions & 2 deletions src/kudu/rpc/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
// under the License.
#pragma once

#include <atomic>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/rpc/connection_id.h"
#include "kudu/rpc/outbound_call.h"
Expand Down Expand Up @@ -189,7 +189,7 @@ class Proxy {
mutable simple_spinlock lock_;
ConnectionId conn_id_;

mutable Atomic32 is_started_;
std::atomic<bool> is_started_;

DISALLOW_COPY_AND_ASSIGN(Proxy);
};
Expand Down
15 changes: 7 additions & 8 deletions src/kudu/rpc/rpc-bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
Expand Down Expand Up @@ -96,8 +95,8 @@ class RpcBench : public RpcTestBase {
public:
RpcBench()
: should_run_(true),
stop_(0)
{}
stop_(0) {
}

void SetUp() override {
RpcTestBase::SetUp();
Expand Down Expand Up @@ -149,7 +148,7 @@ class RpcBench : public RpcTestBase {
friend class ClientAsyncWorkload;

Sockaddr server_addr_;
Atomic32 should_run_;
atomic<bool> should_run_;
CountDownLatch stop_;
};

Expand All @@ -176,7 +175,7 @@ class ClientThread {

AddRequestPB req;
AddResponsePB resp;
while (Acquire_Load(&bench_->should_run_)) {
while (bench_->should_run_) {
req.set_x(request_count_);
req.set_y(request_count_);
RpcController controller;
Expand Down Expand Up @@ -205,7 +204,7 @@ TEST_F(RpcBench, BenchmarkCalls) {
}

SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds));
Release_Store(&should_run_, false);
should_run_ = false;

int total_reqs = 0;

Expand Down Expand Up @@ -233,7 +232,7 @@ class ClientAsyncWorkload {
CHECK_OK(controller_.status());
CHECK_EQ(req_.x() + req_.y(), resp_.result());
}
if (!Acquire_Load(&bench_->should_run_)) {
if (!bench_->should_run_) {
bench_->stop_.CountDown();
return;
}
Expand Down Expand Up @@ -287,7 +286,7 @@ TEST_F(RpcBench, BenchmarkCallsAsync) {
}

SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds));
Release_Store(&should_run_, false);
should_run_ = false;

sw.stop();

Expand Down
17 changes: 5 additions & 12 deletions src/kudu/rpc/rpc_stub-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <algorithm>
#include <atomic>
#include <csignal>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
Expand All @@ -35,7 +36,6 @@
#include <glog/stl_logging.h>
#include <gtest/gtest.h>

#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/proxy.h"
#include "kudu/rpc/rpc-test-base.h"
Expand Down Expand Up @@ -75,7 +75,6 @@ using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using base::subtle::NoBarrier_Load;

namespace kudu {
namespace rpc {
Expand Down Expand Up @@ -302,12 +301,6 @@ TEST_F(RpcStubTest, TestCallWithInvalidParam) {
"missing fields: y");
}

// Wrapper around AtomicIncrement, since AtomicIncrement returns the 'old'
// value, and our callback needs to be a void function.
static void DoIncrement(Atomic32* count) {
base::subtle::Barrier_AtomicIncrement(count, 1);
}

// Test sending a PB parameter with a missing field on the client side.
// This also ensures that the async callback is only called once
// (regression test for a previously-encountered bug).
Expand All @@ -319,13 +312,13 @@ TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) {
req.set_x(10);
// Request is missing the 'y' field.
AddResponsePB resp;
Atomic32 callback_count = 0;
p.AddAsync(req, &resp, &controller, [&callback_count]() { DoIncrement(&callback_count); });
while (NoBarrier_Load(&callback_count) == 0) {
std::atomic<uint32_t> callback_count(0);
p.AddAsync(req, &resp, &controller, [&callback_count]() { ++callback_count; });
while (callback_count == 0) {
SleepFor(MonoDelta::FromMicroseconds(10));
}
SleepFor(MonoDelta::FromMicroseconds(100));
ASSERT_EQ(1, NoBarrier_Load(&callback_count));
ASSERT_EQ(1, callback_count);
ASSERT_STR_CONTAINS(controller.status().ToString(),
"Invalid argument: invalid parameter for call "
"kudu.rpc_test.CalculatorService.Add: missing fields: y");
Expand Down

0 comments on commit 5405d06

Please sign in to comment.