Skip to content

Commit

Permalink
KUDU-2612: retry abort task on timeout
Browse files Browse the repository at this point in the history
It's possible that in the event of a node failure, the abort that
happens automatically to avoid a deadlock will timeout. Instead of
leaving the failure as is, this patch updates the task to be retried.

To expedite testing, this also introduces a new flag to the
TxnSystemClient to reduce the default timeout.

This is a follow-up to d21a0d3.

Change-Id: I303a9a8c85a2191594a22d907770a82da5060f19
Reviewed-on: http://gerrit.cloudera.org:8080/17357
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
andrwng committed Apr 28, 2021
1 parent 79c99ea commit 73310d9
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 11 deletions.
77 changes: 74 additions & 3 deletions src/kudu/integration-tests/txn_write_ops-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,10 @@ DECLARE_bool(txn_manager_enabled);
DECLARE_bool(txn_manager_lazily_initialized);
DECLARE_int32(txn_participant_begin_op_inject_latency_ms);
DECLARE_int32(txn_participant_registration_inject_latency_ms);
DECLARE_int64(txn_system_client_op_timeout_ms);
DECLARE_uint32(tablet_max_pending_txn_write_ops);
DECLARE_uint32(txn_manager_status_table_num_replicas);
DECLARE_uint32(txn_staleness_tracker_interval_ms);

namespace kudu {

Expand Down Expand Up @@ -930,10 +932,13 @@ class TxnOpDispatcherITest : public KuduTest {
CHECK_OK(BuildSchema(&schema_));
}

void Prepare(int num_tservers) {
void Prepare(int num_tservers, bool create_table = true, int num_replicas = 0) {
if (num_replicas == 0) {
num_replicas = num_tservers;
}
FLAGS_txn_manager_enabled = true;
FLAGS_txn_manager_lazily_initialized = false;
FLAGS_txn_manager_status_table_num_replicas = num_tservers;
FLAGS_txn_manager_status_table_num_replicas = num_replicas;

InternalMiniClusterOptions opts;
opts.num_tablet_servers = num_tservers;
Expand All @@ -943,7 +948,9 @@ class TxnOpDispatcherITest : public KuduTest {
KuduClientBuilder builder;
builder.default_admin_operation_timeout(kTimeout);
ASSERT_OK(cluster_->CreateClient(&builder, &client_));
ASSERT_OK(CreateTable(num_tservers));
if (create_table) {
ASSERT_OK(CreateTable(num_replicas));
}
for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
auto* ts = cluster_->mini_tablet_server(i);
ASSERT_OK(ts->WaitStarted());
Expand Down Expand Up @@ -1163,6 +1170,70 @@ TEST_F(TxnOpDispatcherITest, LifecycleBasic) {
}
}

// Test that the automatic abort to avoid deadlock gets retried if the op times
// out.
TEST_F(TxnOpDispatcherITest, TestRetryWaitDieAbortsWhenTServerUnavailable) {
SKIP_IF_SLOW_NOT_ALLOWED();

// Disable the staleness tracker so we know any aborts were done by the
// wait-die deadlock prevention.
FLAGS_txn_staleness_tracker_interval_ms = 0;
// Set a low system client timeout to make sure our abort task retries.
FLAGS_txn_system_client_op_timeout_ms = 1000;

NO_FATALS(Prepare(/*num_tservers*/2, /*create_table*/false, /*num_replicas*/1));
// First, figure out which tablet server hosts the TxnStatusManager.
tserver::MiniTabletServer* tsm_server = nullptr;
ASSERT_EVENTUALLY([&] {
for (int i = 0; i < cluster_->num_tablet_servers() && tsm_server == nullptr; i++) {
auto* mts = cluster_->mini_tablet_server(i);
auto* tablet_manager = mts->server()->tablet_manager();
vector<scoped_refptr<TabletReplica>> replicas;
tablet_manager->GetTabletReplicas(&replicas);
if (!replicas.empty()) {
tsm_server = mts;
}
}
ASSERT_FALSE(tsm_server == nullptr);
});

// Create a single-tablet table so shutting down the TxnStatusManager doesn't
// affect writes.
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema_)
.set_range_partition_columns({ "key" })
.num_replicas(1)
.Create());
ASSERT_OK(client_->OpenTable(kTableName, &table_));
shared_ptr<KuduTransaction> first_txn;
shared_ptr<KuduTransaction> second_txn;
ASSERT_OK(client_->NewTransaction(&first_txn));
ASSERT_OK(client_->NewTransaction(&second_txn));

int64_t key = 0;
ASSERT_OK(InsertRows(first_txn.get(), 1, &key));

// The second transaction should always fail because it's attempting to lock
// a tablet that's already locked.
Status s = InsertRows(second_txn.get(), 1, &key);
ASSERT_TRUE(s.IsIOError()) << s.ToString();

// Immediately shutdown, reducing the likelihood that the automatic abort
// task will complete. Then sleep for long enough that the system client
// would timeout and try again.
tsm_server->Shutdown();
SleepFor(MonoDelta::FromMilliseconds(3 * FLAGS_txn_system_client_op_timeout_ms));
ASSERT_OK(tsm_server->Restart());
ASSERT_EVENTUALLY([&] {
bool is_complete = false;
Status completion_status;
ASSERT_OK(second_txn->IsCommitComplete(&is_complete, &completion_status));
ASSERT_TRUE(completion_status.IsAborted()) << completion_status.ToString();
ASSERT_TRUE(is_complete);
});
}

// Test that when attempting to lock a transaction that is locked by an earlier
// transaction, we abort the newer transaction.
TEST_F(TxnOpDispatcherITest, BeginTxnLockAbort) {
Expand Down
24 changes: 24 additions & 0 deletions src/kudu/transactions/txn_system_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ DEFINE_bool(disable_txn_system_client_init, false,
"client connections.");
TAG_FLAG(disable_txn_system_client_init, unsafe);

DEFINE_int64(txn_system_client_op_timeout_ms, 10 * 1000,
"Op timeout used by the TxnSystemClient when making transactions-related "
"RPCs to the TxnStatusManager.");
TAG_FLAG(txn_system_client_op_timeout_ms, advanced);
TAG_FLAG(txn_system_client_op_timeout_ms, runtime);

DECLARE_int64(rpc_negotiation_timeout_ms);

using kudu::client::KuduClient;
Expand Down Expand Up @@ -185,6 +191,9 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,
uint32_t* txn_keepalive_ms,
int64_t* highest_seen_txn_id,
MonoDelta timeout) {
if (!timeout.Initialized()) {
timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
}
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_TXN);
coordinate_txn_op.set_txn_id(txn_id);
Expand Down Expand Up @@ -213,6 +222,9 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id,

Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& participant_id,
const string& user, MonoDelta timeout) {
if (!timeout.Initialized()) {
timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
}
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::REGISTER_PARTICIPANT);
coordinate_txn_op.set_txn_id(txn_id);
Expand All @@ -228,6 +240,9 @@ Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& partic
Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id,
const string& user,
MonoDelta timeout) {
if (!timeout.Initialized()) {
timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
}
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_COMMIT_TXN);
coordinate_txn_op.set_txn_id(txn_id);
Expand All @@ -242,6 +257,9 @@ Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id,
Status TxnSystemClient::AbortTransaction(int64_t txn_id,
const string& user,
MonoDelta timeout) {
if (!timeout.Initialized()) {
timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
}
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::ABORT_TXN);
coordinate_txn_op.set_txn_id(txn_id);
Expand All @@ -257,6 +275,9 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
const string& user,
TxnStatusEntryPB* txn_status,
MonoDelta timeout) {
if (!timeout.Initialized()) {
timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
}
DCHECK(txn_status);
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::GET_TXN_STATUS);
Expand Down Expand Up @@ -284,6 +305,9 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id,
Status TxnSystemClient::KeepTransactionAlive(int64_t txn_id,
const string& user,
MonoDelta timeout) {
if (!timeout.Initialized()) {
timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms);
}
CoordinatorOpPB coordinate_txn_op;
coordinate_txn_op.set_type(CoordinatorOpPB::KEEP_TXN_ALIVE);
coordinate_txn_op.set_txn_id(txn_id);
Expand Down
12 changes: 6 additions & 6 deletions src/kudu/transactions/txn_system_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,37 +102,37 @@ class TxnSystemClient {
std::string& user,
uint32_t* txn_keepalive_ms = nullptr,
int64_t* highest_seen_txn_id = nullptr,
MonoDelta timeout = MonoDelta::FromSeconds(10));
MonoDelta timeout = MonoDelta());

// Attempts to register the given participant with the given transaction.
// Returns an error if the transaction hasn't yet been started, or if the
// 'user' isn't permitted to modify the transaction.
Status RegisterParticipant(int64_t txn_id, const std::string& participant_id,
const std::string& user,
MonoDelta timeout = MonoDelta::FromSeconds(10));
MonoDelta timeout = MonoDelta());

// Initiates committing a transaction with the given identifier.
Status BeginCommitTransaction(int64_t txn_id,
const std::string& user,
MonoDelta timeout = MonoDelta::FromSeconds(10));
MonoDelta timeout = MonoDelta());

// Aborts a transaction with the given identifier.
Status AbortTransaction(int64_t txn_id,
const std::string& user,
MonoDelta timeout = MonoDelta::FromSeconds(10));
MonoDelta timeout = MonoDelta());

// Retrieves transactions status. On success, returns Status::OK() and stores
// the result status in the 'txn_status' output parameter. On failure,
// returns corresponding Status.
Status GetTransactionStatus(int64_t txn_id,
const std::string& user,
TxnStatusEntryPB* txn_status,
MonoDelta timeout = MonoDelta::FromSeconds(10));
MonoDelta timeout = MonoDelta());

// Send keep-alive heartbeat for the specified transaction as the given user.
Status KeepTransactionAlive(int64_t txn_id,
const std::string& user,
MonoDelta timeout = MonoDelta::FromSeconds(10));
MonoDelta timeout = MonoDelta());

// Opens the transaction status table, refreshing metadata with that from the
// masters.
Expand Down
14 changes: 12 additions & 2 deletions src/kudu/tserver/ts_tablet_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1903,8 +1903,18 @@ Status TSTabletManager::ScheduleAbortTxn(int64_t txn_id, const string& user) {
return txn_participant_registration_pool_->Submit(
[this, txn_id, tsc, user] () {
LOG(INFO) << Substitute("Sending abort request for transaction $0", txn_id);
WARN_NOT_OK(tsc->AbortTransaction(txn_id, user),
Substitute("Error aborting transaction $0 as user $1", txn_id, user));
auto s = tsc->AbortTransaction(txn_id, user);
if (s.IsTimedOut()) {
// Presumably this was a transient error. Try again.
{
std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
txn_aborts_in_progress_.erase(txn_id);
}
WARN_NOT_OK(ScheduleAbortTxn(txn_id, user),
Substitute("Could not reschedule abort of transaction $0", txn_id));
return;
}
WARN_NOT_OK(s, Substitute("Error aborting transaction $0 as user $1", txn_id, user));
std::lock_guard<simple_spinlock> l(txn_aborts_lock_);
txn_aborts_in_progress_.erase(txn_id);
});
Expand Down

0 comments on commit 73310d9

Please sign in to comment.