Skip to content

Commit

Permalink
status_callback: replace Bind usage with lambdas
Browse files Browse the repository at this point in the history
StatusCallback was another significant user of kudu::Bind. I've removed it
in favor of StdStatusCallback, which has been renamed to StatusCallback. All
affected Bind users were converted into lambdas.

Change-Id: I66e90b8144b2a6b03b1e102dd4ca93f8e232b1a9
Reviewed-on: http://gerrit.cloudera.org:8080/15562
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
  • Loading branch information
adembo committed Mar 28, 2020
1 parent 5ce3a02 commit be2d7c5
Show file tree
Hide file tree
Showing 30 changed files with 133 additions and 161 deletions.
33 changes: 15 additions & 18 deletions src/kudu/client/authz_token_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,12 @@
#include <unordered_map>
#include <vector>

#include <boost/bind.hpp> // IWYU pragma: keep
#include <glog/logging.h>

#include "kudu/client/client-internal.h"
#include "kudu/client/client.h"
#include "kudu/client/master_proxy_rpc.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
Expand All @@ -47,28 +43,29 @@
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"

using kudu::master::MasterFeatures;
using kudu::master::MasterServiceProxy;
using kudu::rpc::BackoffType;
using kudu::rpc::ResponseCallback;
using kudu::security::SignedTokenPB;
using std::string;
using std::vector;
using strings::Substitute;

namespace kudu {

using master::MasterFeatures;
using master::MasterServiceProxy;
using rpc::BackoffType;
using security::SignedTokenPB;

namespace client {
namespace internal {

RetrieveAuthzTokenRpc::RetrieveAuthzTokenRpc(const KuduTable* table,
MonoTime deadline)
: AsyncLeaderMasterRpc(deadline, table->client(), BackoffType::LINEAR, req_, &resp_,
&MasterServiceProxy::GetTableSchemaAsync, "RetrieveAuthzToken",
Bind(&AuthzTokenCache::RetrievedNewAuthzTokenCb,
Unretained(&table->client()->data_->authz_token_cache_),
table->id()),
{ MasterFeatures::GENERATE_AUTHZ_TOKEN }),
: AsyncLeaderMasterRpc(
deadline, table->client(), BackoffType::LINEAR, req_, &resp_,
&MasterServiceProxy::GetTableSchemaAsync, "RetrieveAuthzToken",
[table](const Status& s) {
table->client()->data_->authz_token_cache_.RetrievedNewAuthzTokenCb(
table->id(), s);
},
{ MasterFeatures::GENERATE_AUTHZ_TOKEN }),
table_(table) {
req_.mutable_table()->set_table_id(table_->id());
}
Expand Down Expand Up @@ -98,7 +95,7 @@ void RetrieveAuthzTokenRpc::SendRpcCb(const Status& status) {
client_->data_->authz_token_cache_.Put(table_->id(), resp_.authz_token());
}
}
user_cb_.Run(new_status);
user_cb_(new_status);
}

void AuthzTokenCache::Put(const string& table_id, SignedTokenPB authz_token) {
Expand Down Expand Up @@ -154,7 +151,7 @@ void AuthzTokenCache::RetrievedNewAuthzTokenCb(const string& table_id,
}
DCHECK(!cbs.empty());
for (const auto& cb : cbs) {
cb.Run(status);
cb(status);
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/kudu/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "kudu/client/batcher.h"

#include <cstddef>
#include <functional>
#include <mutex>
#include <ostream>
#include <string>
Expand All @@ -44,7 +45,6 @@
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/atomic_refcount.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
Expand Down Expand Up @@ -544,7 +544,7 @@ bool WriteRpc::GetNewAuthnTokenAndRetry() {
KuduClient* c = batcher_->client_;
VLOG(1) << "Retrieving new authn token from master";
c->data_->ConnectToClusterAsync(c, retrier().deadline(),
Bind(&WriteRpc::GotNewAuthnTokenRetryCb, Unretained(this)),
[this](const Status& s) { this->GotNewAuthnTokenRetryCb(s); },
CredentialsPolicy::PRIMARY_CREDENTIALS);
return true;
}
Expand All @@ -555,7 +555,7 @@ bool WriteRpc::GetNewAuthzTokenAndRetry() {
KuduClient* c = batcher_->client_;
VLOG(1) << "Retrieving new authz token from master";
c->data_->RetrieveAuthzTokenAsync(table(),
Bind(&WriteRpc::GotNewAuthzTokenRetryCb, Unretained(this)),
[this](const Status& s) { this->GotNewAuthzTokenRetryCb(s); },
retrier().deadline());
return true;
}
Expand Down Expand Up @@ -722,15 +722,17 @@ Status Batcher::Add(KuduWriteOperation* write_op) {
//
// deadline_ is set in FlushAsync(), after all Add() calls are done, so
// here we're forced to create a new deadline.
auto op_raw = op.get();
MonoTime deadline = ComputeDeadlineUnlocked();
base::RefCountInc(&outstanding_lookups_);
scoped_refptr<Batcher> self(this);
client_->data_->meta_cache_->LookupTabletByKey(
op->write_op->table(),
std::move(partition_key),
deadline,
MetaCache::LookupType::kPoint,
&op->tablet,
Bind(&Batcher::TabletLookupFinished, this, op.get()));
[self, op_raw](const Status& s) { self->TabletLookupFinished(op_raw, s); });
IgnoreResult(op.release());

buffer_bytes_used_.IncrementBy(write_op->SizeInBuffer());
Expand Down
8 changes: 4 additions & 4 deletions src/kudu/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ void KuduClient::Data::ConnectedToClusterCb(
}

for (const StatusCallback& cb : cbs) {
cb.Run(status);
cb(status);
}
}

Expand Down Expand Up @@ -676,12 +676,12 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
s = dns_resolver_->ResolveAddresses(hp, &addrs);
}
if (!s.ok()) {
cb.Run(s);
cb(s);
return;
}
if (addrs.empty()) {
cb.Run(Status::InvalidArgument(Substitute("No master address specified by '$0'",
master_server_addr)));
cb(Status::InvalidArgument(Substitute("No master address specified by '$0'",
master_server_addr)));
return;
}
if (addrs.size() > 1) {
Expand Down
7 changes: 2 additions & 5 deletions src/kudu/client/master_proxy_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include "kudu/client/client-internal.h"
#include "kudu/client/client.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/rpc/response_callback.h"
Expand Down Expand Up @@ -148,7 +146,7 @@ void AsyncLeaderMasterRpc<ReqClass, RespClass>::SendRpcCb(const Status& status)
if (s.ok() && resp_->has_error()) {
s = StatusFromPB(resp_->error().status());
}
user_cb_.Run(s);
user_cb_(s);
}

template <class ReqClass, class RespClass>
Expand Down Expand Up @@ -178,8 +176,7 @@ void AsyncLeaderMasterRpc<ReqClass, RespClass>::ResetMasterLeaderAndRetry(
// FATAL_INVALID_AUTHENTICATION_TOKEN error as well.
client_->data_->ConnectToClusterAsync(
client_, retrier().deadline(),
Bind(&AsyncLeaderMasterRpc<ReqClass, RespClass>::NewLeaderMasterDeterminedCb,
Unretained(this), creds_policy),
[=](const Status& s) { this->NewLeaderMasterDeterminedCb(creds_policy, s); },
creds_policy);
}

Expand Down
6 changes: 3 additions & 3 deletions src/kudu/client/master_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.proxy.h"
Expand Down Expand Up @@ -226,7 +225,7 @@ void ConnectToMasterRpc::SendRpcCb(const Status& status) {
new_status = StatusFromPB(out_->error().status());
}
}
user_cb_.Run(new_status);
user_cb_(new_status);
}

} // anonymous namespace
Expand Down Expand Up @@ -277,8 +276,9 @@ void ConnectToClusterRpc::SendRpc() {

std::lock_guard<simple_spinlock> l(lock_);
for (int i = 0; i < addrs_with_names_.size(); i++) {
scoped_refptr<ConnectToClusterRpc> self(this);
ConnectToMasterRpc* rpc = new ConnectToMasterRpc(
Bind(&ConnectToClusterRpc::SingleNodeCallback, this, i),
[self, i](const Status& s) { self->SingleNodeCallback(i, s); },
addrs_with_names_[i],
actual_deadline,
retrier().messenger(),
Expand Down
34 changes: 16 additions & 18 deletions src/kudu/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <utility>
#include <vector>

#include <boost/bind.hpp> // IWYU pragma: keep
#include <glog/logging.h>
#include <google/protobuf/repeated_field.h> // IWYU pragma: keep

Expand All @@ -39,8 +38,6 @@
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
Expand Down Expand Up @@ -99,7 +96,7 @@ void RemoteTabletServer::DnsResolutionFinished(const HostPort& hp,

if (!s.ok()) {
s = s.CloneAndPrepend("Failed to resolve address for TS " + uuid_);
user_callback.Run(s);
user_callback(s);
return;
}

Expand All @@ -111,7 +108,7 @@ void RemoteTabletServer::DnsResolutionFinished(const HostPort& hp,
proxy_.reset(new TabletServerServiceProxy(client->data_->messenger_, (*addrs)[0], hp.host()));
proxy_->set_user_credentials(client->data_->user_credentials_);
}
user_callback.Run(s);
user_callback(s);
}

void RemoteTabletServer::InitProxy(KuduClient* client, const StatusCallback& cb) {
Expand All @@ -122,7 +119,7 @@ void RemoteTabletServer::InitProxy(KuduClient* client, const StatusCallback& cb)
if (proxy_) {
// Already have a proxy created.
l.unlock();
cb.Run(Status::OK());
cb(Status::OK());
return;
}

Expand All @@ -134,8 +131,9 @@ void RemoteTabletServer::InitProxy(KuduClient* client, const StatusCallback& cb)

auto addrs = new vector<Sockaddr>();
client->data_->dns_resolver_->ResolveAddressesAsync(
hp, addrs, Bind(&RemoteTabletServer::DnsResolutionFinished,
Unretained(this), hp, addrs, client, cb));
hp, addrs, [=](const Status& s) {
this->DnsResolutionFinished(hp, addrs, client, cb, s);
});
}

void RemoteTabletServer::Update(const master::TSInfoPB& pb) {
Expand Down Expand Up @@ -457,17 +455,17 @@ void MetaCacheServerPicker::PickLeader(const ServerPickedCallback& callback,
deadline,
MetaCache::LookupType::kPoint,
nullptr,
Bind(&MetaCacheServerPicker::LookUpTabletCb, Unretained(this), callback, deadline));
[this, callback, deadline](const Status& s) {
this->LookUpTabletCb(callback, deadline, s);
});
return;
}

// If we have a current TS initialize the proxy.
// Make sure we have a working proxy before sending out the RPC.
leader->InitProxy(client_,
Bind(&MetaCacheServerPicker::InitProxyCb,
Unretained(this),
callback,
leader));
leader->InitProxy(client_, [this, callback, leader](const Status& s) {
this->InitProxyCb(callback, leader, s);
});
}

void MetaCacheServerPicker::MarkServerFailed(RemoteTabletServer* replica, const Status& status) {
Expand Down Expand Up @@ -672,7 +670,7 @@ void LookupRpc::SendRpc() {
Status fastpath_status = meta_cache_->DoFastPathLookup(
table_, &partition_key_, lookup_type_, remote_tablet_);
if (!fastpath_status.IsIncomplete()) {
user_cb_.Run(fastpath_status);
user_cb_(fastpath_status);
delete this;
return;
}
Expand Down Expand Up @@ -720,7 +718,7 @@ void LookupRpc::ResetMasterLeaderAndRetry(CredentialsPolicy creds_policy) {
table_->client()->data_->ConnectToClusterAsync(
table_->client(),
retrier().deadline(),
Bind(&LookupRpc::NewLeaderMasterDeterminedCb, Unretained(this), creds_policy),
[=](const Status& s) { this->NewLeaderMasterDeterminedCb(creds_policy, s); },
creds_policy);
}

Expand Down Expand Up @@ -764,7 +762,7 @@ void LookupRpc::SendRpcCb(const Status& status) {
new_status = new_status.CloneAndPrepend(Substitute("$0 failed", ToString()));
KLOG_EVERY_N_SECS(WARNING, 1) << new_status.ToString();
}
user_cb_.Run(new_status);
user_cb_(new_status);
}

Status MetaCache::ProcessLookupResponse(const LookupRpc& rpc,
Expand Down Expand Up @@ -1016,7 +1014,7 @@ void MetaCache::LookupTabletByKey(const KuduTable* table,
Status fastpath_status = DoFastPathLookup(
table, &partition_key, lookup_type, remote_tablet);
if (!fastpath_status.IsIncomplete()) {
callback.Run(fastpath_status);
callback(fastpath_status);
return;
}

Expand Down
5 changes: 2 additions & 3 deletions src/kudu/consensus/consensus-test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "kudu/consensus/log.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
Expand Down Expand Up @@ -678,8 +677,8 @@ class TestDriver {
std::unique_ptr<CommitMsg> msg(new CommitMsg);
msg->set_op_type(round_->replicate_msg()->op_type());
msg->mutable_commited_op_id()->CopyFrom(round_->id());
CHECK_OK(log_->AsyncAppendCommit(std::move(msg),
Bind(&TestDriver::CommitCallback, Unretained(this))));
CHECK_OK(log_->AsyncAppendCommit(
std::move(msg), [this](const Status& s) { this->CommitCallback(s); }));
}

void CommitCallback(const Status& s) {
Expand Down
18 changes: 8 additions & 10 deletions src/kudu/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
Expand Down Expand Up @@ -377,12 +375,13 @@ void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id,
}
ResponseFromPeer(local_peer_pb_.permanent_uuid(), fake_response);

callback.Run(status);
callback(status);
}

Status PeerMessageQueue::AppendOperation(const ReplicateRefPtr& msg) {
return AppendOperations({ msg }, Bind(CrashIfNotOkStatusCB,
"Enqueued replicate operation failed to write to WAL"));
return AppendOperations({ msg }, [](const Status& s) {
CrashIfNotOkStatusCB("Enqueued replicate operation failed to write to WAL", s);
});
}

Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
Expand Down Expand Up @@ -424,11 +423,10 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
// for the log buffer to empty, it may need to call LocalPeerAppendFinished()
// which also needs queue_lock_.
lock.unlock();
RETURN_NOT_OK(log_cache_.AppendOperations(msgs,
Bind(&PeerMessageQueue::LocalPeerAppendFinished,
Unretained(this),
last_id,
log_append_callback)));
RETURN_NOT_OK(log_cache_.AppendOperations(
msgs, [this, last_id, log_append_callback](const Status& s) {
this->LocalPeerAppendFinished(last_id, log_append_callback, s);
}));
lock.lock();
DCHECK(last_id.IsInitialized());
queue_state_.last_appended = last_id;
Expand Down
Loading

0 comments on commit be2d7c5

Please sign in to comment.