Skip to content

Commit

Permalink
Fix bug in incorrect response rebuilding on tablet bootstrap
Browse files Browse the repository at this point in the history
This fixes the bug described in: https://gerrit.cloudera.org/#/c/5417/
... and enables the test disabled in that patch.

Along the way this also performs some cleanup of tablet bootstrap.

I ran raft_consensus-itest's TestInsertDuplicateKeysWithCrashyNodes
on dist-test for 5000 loops with the following config:

KUDU_ALLOW_SLOW_TESTS=1 ../../build-support/dist_test.py \
--disable-sharding loop -n 5000 -- bin/raft_consensus-itest \
--gtest_filter=*DuplicateKeysWithCrashyNodes

Prior to this patch the test failed 62/5000:
http://dist-test.cloudera.org//job?job_id=david.alves.1493915326.763

After this patch the test passes 5000/5000:
http://dist-test.cloudera.org//job?job_id=david.alves.1493914745.27867

Change-Id: I1219ed5f7835e93cd7f3b128cedd650fc3384482
Reviewed-on: http://gerrit.cloudera.org:8080/5489
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <[email protected]>
  • Loading branch information
dralves authored and toddlipcon committed May 5, 2017
1 parent 30d9e72 commit 4df9f59
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 115 deletions.
8 changes: 4 additions & 4 deletions src/kudu/tablet/row_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ string RowOp::ToString(const Schema& schema) const {
return decoded_op.ToString(schema);
}

void RowOp::SetAlreadyFlushed() {
DCHECK(!result) << SecureDebugString(*result);
result.reset(new OperationResultPB());
result->set_flushed(true);
void RowOp::SetSkippedResult(const OperationResultPB& result) {
DCHECK(!this->result) << SecureDebugString(*this->result);
DCHECK(result.skip_on_replay());
this->result.reset(new OperationResultPB(result));
}

} // namespace tablet
Expand Down
9 changes: 6 additions & 3 deletions src/kudu/tablet/row_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ namespace tablet {
struct RowOp {
public:
explicit RowOp(DecodedRowOperation decoded_op);
RowOp();
~RowOp();

// Functions to set the result of the mutation.
// Only one of the following three functions must be called,
// at most once.
// Only one of the following four functions must be called, at most once.
void SetFailed(const Status& s);
void SetInsertSucceeded(int mrs_id);
void SetMutateSucceeded(gscoped_ptr<OperationResultPB> result);
void SetAlreadyFlushed();
// Sets the result of a skipped operation on bootstrap.
// TODO(dralves) Currently this performs a copy. Might be avoided with some refactoring.
// see TODO(dralves) in TabletBoostrap::ApplyOperations().
void SetSkippedResult(const OperationResultPB& result);

// In the case that this operation is being replayed from the WAL
// during tablet bootstrap, we may need to look at the original result
Expand Down
5 changes: 3 additions & 2 deletions src/kudu/tablet/tablet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ message MemStoreTargetPB {
// Stores the result of an Insert or Mutate.
message OperationResultPB {

// set on replay if this operation was already flushed.
optional bool flushed = 1 [ default = false ];
// set on replay to reflect that this operation was already flushed or had previously
// failed and should not be applied again.
optional bool skip_on_replay = 1 [ default = false ];

// set if this particular operation failed
optional kudu.AppStatusPB failed_status = 2;
Expand Down
214 changes: 109 additions & 105 deletions src/kudu/tablet/tablet_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,32 +248,46 @@ class TabletBootstrap {
Status PlayNoOpRequest(ReplicateMsg* replicate_msg,
const CommitMsg& commit_msg);

// Plays operations, skipping those that have already been flushed,
// as indicated in the 'already_flushed' vector.
// Plays operations, skipping those that have already been flushed or have previously failed.
// See ApplyRowOperations() for more details on how the decision of whether an operation
// is applied or skipped is made.
Status PlayRowOperations(WriteTransactionState* tx_state,
const SchemaPB& schema_pb,
const RowOperationsPB& ops_pb,
const TxResultPB& result,
const vector<bool>& already_flushed);
const TxResultPB& orig_result,
TxResultPB* new_result);

// Determine which of the operations from 'result' correspond to already-flushed stores.
// Determine which of the operations from 'orig_result' must be skipped.
// At the same time this builds the WriteResponsePB that we'll store on the ResultTracker.
Status DetermineFlushedOpsAndBuildResponse(const TxResultPB& result,
vector<bool>* flushed_by_op,
WriteResponsePB* response);

// Pass through all of the decoded operations in tx_state. For
// each op:
// 'new_result' store the results of the operations that were skipped, 'response' stores
// any error that might have previously happened so that we can send them back to clients,
// if needed.
// Finally 'all_skipped' indicates whether all of the original operations were skipped.
Status DetermineSkippedOpsAndBuildResponse(const TxResultPB& orig_result,
TxResultPB* new_result,
WriteResponsePB* response,
bool* all_skipped);

// Pass through all of the decoded operations in tx_state. For each op:
// - if it was previously failed, mark as failed
// - if it previously succeeded but was flushed, skip it.
// - otherwise, re-apply to the tablet being bootstrapped.
Status ApplyOperations(WriteTransactionState* tx_state,
const TxResultPB& orig_result);
const TxResultPB& orig_result,
TxResultPB* new_result);

enum OpAction {
// The operation was never applied or was applied to an unflushed memory store and thus
// needs to be applied again.
NEEDS_REPLAY,
// The operation was already applied to a memory store that was flushed.
SKIP_PREVIOUSLY_FLUSHED,
// The operation was never applied due to an error.
SKIP_PREVIOUSLY_FAILED
};

// Filter a row operation, setting '*already_flushed' to indicate if
// it was already flushed.
// Filter a row operation, setting 'action' to indicate what needs to be done
// to the operation, i.e. whether it must applied or skipped.
Status FilterOperation(const OperationResultPB& op_result,
bool* already_flushed);
OpAction* action);

enum ActiveStores {
// The OperationResultPBs in the commit message do not reference any stores.
Expand Down Expand Up @@ -1242,22 +1256,38 @@ Status TabletBootstrap::AppendCommitMsg(const CommitMsg& commit_msg) {
return log_->Append(&commit_entry);
}

Status TabletBootstrap::DetermineFlushedOpsAndBuildResponse(const TxResultPB& result,
vector<bool>* flushed_by_op,
WriteResponsePB* response) {
int num_ops = result.ops_size();
flushed_by_op->resize(num_ops);
Status TabletBootstrap::DetermineSkippedOpsAndBuildResponse(const TxResultPB& orig_result,
TxResultPB* new_result,
WriteResponsePB* response,
bool* all_skipped) {
int num_ops = orig_result.ops_size();
new_result->mutable_ops()->Reserve(num_ops);
*all_skipped = true;

for (int i = 0; i < num_ops; i++) {
const auto& orig_op_result = result.ops(i);
if (orig_op_result.has_failed_status() && response) {
WriteResponsePB::PerRowErrorPB* error = response->add_per_row_errors();
error->set_row_index(i);
error->mutable_error()->CopyFrom(orig_op_result.failed_status());
const auto& orig_op_result = orig_result.ops(i);
OpAction action;
RETURN_NOT_OK(FilterOperation(orig_op_result, &action));
*all_skipped &= action != NEEDS_REPLAY;

if (action != NEEDS_REPLAY) {
new_result->mutable_ops(i)->set_skip_on_replay(true);
}

if (action == SKIP_PREVIOUSLY_FAILED) {
if (response) {
WriteResponsePB::PerRowErrorPB* error = response->add_per_row_errors();
error->set_row_index(i);
error->mutable_error()->CopyFrom(orig_op_result.failed_status());
}
// If the op is already flushed we won't be applying it.
DCHECK(orig_op_result.has_failed_status());
new_result->mutable_ops(i)->mutable_failed_status()->CopyFrom(orig_op_result.failed_status());
}
bool f;
RETURN_NOT_OK(FilterOperation(orig_op_result, &f));
(*flushed_by_op)[i] = f;
}

if (*all_skipped) {
stats_.ops_ignored++;
}
return Status::OK();
}
Expand All @@ -1267,8 +1297,8 @@ Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
// Prepare the commit entry for the rewritten log.
LogEntryPB commit_entry;
commit_entry.set_type(log::COMMIT);
CommitMsg* commit = commit_entry.mutable_commit();
commit->CopyFrom(commit_msg);
CommitMsg* new_commit = commit_entry.mutable_commit();
new_commit->CopyFrom(commit_msg);

// Set up the new transaction.
// Even if we're going to ignore the transaction, it's important to
Expand Down Expand Up @@ -1313,45 +1343,30 @@ Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
// storage and don't need to be re-applied. We can do this even before
// we decode any row operations, so we can short-circuit that decoding
// in the case that the entire op has been already flushed.
vector<bool> already_flushed;
RETURN_NOT_OK(DetermineFlushedOpsAndBuildResponse(commit_msg.result(),
&already_flushed,
response.get()));
TxResultPB* new_result = new_commit->mutable_result();
bool all_flushed;
RETURN_NOT_OK(DetermineSkippedOpsAndBuildResponse(commit_msg.result(),
new_result,
response.get(),
&all_flushed));

if (tracking_results && state == ResultTracker::NEW) {
result_tracker_->RecordCompletionAndRespond(replicate_msg->request_id(), response.get());
}

Status play_status;
bool all_already_flushed = std::all_of(already_flushed.begin(),
already_flushed.end(),
[](bool f) { return f; });
if (all_already_flushed) {
stats_.ops_ignored++;
for (auto& op : *commit->mutable_result()->mutable_ops()) {
op.Clear();
op.set_flushed(true);
}
} else {
if (write->has_row_operations()) {
// TODO(todd): get rid of redundant params below - they can be gotten from the Request
// Rather than RETURN_NOT_OK() here, we need to just save the status and do the
// RETURN_NOT_OK() down below the Commit() call below. Even though it seems wrong
// to commit the transaction when in fact it failed to apply, we would throw a CHECK
// failure if we attempted to 'Abort()' after entering the applying stage. Allowing it to
// Commit isn't problematic because we don't expose the results anyway, and the bad
// Status returned below will cause us to fail the entire tablet bootstrap anyway.
play_status = PlayRowOperations(&tx_state,
write->schema(),
write->row_operations(),
commit_msg.result(),
already_flushed);
}
if (!all_flushed && write->has_row_operations()) {
// Rather than RETURN_NOT_OK() here, we need to just save the status and do the
// RETURN_NOT_OK() down below the Commit() call below. Even though it seems wrong
// to commit the transaction when in fact it failed to apply, we would throw a CHECK
// failure if we attempted to 'Abort()' after entering the applying stage. Allowing it to
// Commit isn't problematic because we don't expose the results anyway, and the bad
// Status returned below will cause us to fail the entire tablet bootstrap anyway.
play_status = PlayRowOperations(&tx_state, commit_msg.result(), new_result);

if (play_status.ok()) {
// Replace the original commit message's result with the new one from
// the replayed operation.
tx_state.ReleaseTxResultPB(commit->mutable_result());
// Replace the original commit message's result with the new one from the replayed operation.
tx_state.ReleaseTxResultPB(new_commit->mutable_result());
}
}

Expand Down Expand Up @@ -1424,43 +1439,33 @@ Status TabletBootstrap::PlayNoOpRequest(ReplicateMsg* replicate_msg, const Commi
}

Status TabletBootstrap::PlayRowOperations(WriteTransactionState* tx_state,
const SchemaPB& schema_pb,
const RowOperationsPB& ops_pb,
const TxResultPB& result,
const vector<bool>& already_flushed) {
const TxResultPB& orig_result,
TxResultPB* new_result) {
Schema inserts_schema;
RETURN_NOT_OK_PREPEND(SchemaFromPB(schema_pb, &inserts_schema),
RETURN_NOT_OK_PREPEND(SchemaFromPB(tx_state->request()->schema(), &inserts_schema),
"Couldn't decode client schema");

RETURN_NOT_OK_PREPEND(tablet_->DecodeWriteOperations(&inserts_schema, tx_state),
Substitute("Could not decode row operations: $0",
SecureShortDebugString(ops_pb)));
DCHECK_EQ(tx_state->row_ops().size(), already_flushed.size());

// Propagate the 'already_flushed' information into the decoded operations.
// This signals to ApplyOperations() below that it doesn't need to actually
// apply these ops again.
for (int i = 0; i < already_flushed.size(); i++) {
if (already_flushed[i]) {
tx_state->row_ops()[i]->SetAlreadyFlushed();
}
}
SecureDebugString(tx_state->request()->row_operations())));

// Run AcquireRowLocks, Apply, etc!
RETURN_NOT_OK_PREPEND(tablet_->AcquireRowLocks(tx_state),
"Failed to acquire row locks");

RETURN_NOT_OK(ApplyOperations(tx_state, result));
RETURN_NOT_OK(ApplyOperations(tx_state, orig_result, new_result));

return Status::OK();
}

Status TabletBootstrap::ApplyOperations(WriteTransactionState* tx_state,
const TxResultPB& orig_result) {
const TxResultPB& orig_result,
TxResultPB* new_result) {
DCHECK_EQ(tx_state->row_ops().size(), orig_result.ops_size());
DCHECK_EQ(tx_state->row_ops().size(), new_result->ops_size());
int32_t op_idx = 0;
for (RowOp* op : tx_state->row_ops()) {
const OperationResultPB& orig_op_result = orig_result.ops(op_idx++);

int32_t curr_op_idx = op_idx++;
// Increment the seen/ignored stats.
switch (op->decoded_op.type) {
case RowOperationsPB::INSERT:
Expand All @@ -1485,26 +1490,19 @@ Status TabletBootstrap::ApplyOperations(WriteTransactionState* tx_state,
break;
}

// If the op is already flushed, no need to replay it.
if (op->has_result()) {
DCHECK(op->result->flushed());
const OperationResultPB& new_op_result = new_result->ops(curr_op_idx);
// If the op is already flushed or had previously failed, no need to replay it.
// TODO(dralves) this back and forth is weird. We're first setting the flushed/failed
// status on the rewritten message's commit entry. Then we pass it here to
// set the status on the op, then we set it back on the commit entry with
// ReleaseTxResultPB(). This could be simplified if we build the RowOps on
// demand and just created DecodedRowOperation/RowOp for the replayed stuff.
if (new_op_result.skip_on_replay()) {
op->SetSkippedResult(new_op_result);
continue;
}

op->set_original_result_from_log(&orig_op_result);

// check if the operation failed in the original transaction
if (PREDICT_FALSE(orig_op_result.has_failed_status())) {
Status status = StatusFromPB(orig_op_result.failed_status());
if (VLOG_IS_ON(1)) {
VLOG_WITH_PREFIX(1) << "Skipping operation that originally resulted in error. OpId: "
<< SecureDebugString(tx_state->op_id()) << " op index: "
<< op_idx - 1 << " original error: "
<< status.ToString();
}
op->SetFailed(status);
continue;
}
op->set_original_result_from_log(&orig_result.ops(curr_op_idx));

// Actually apply it.
ProbeStats stats; // we don't use this, but tablet internals require non-NULL.
Expand All @@ -1527,10 +1525,16 @@ Status TabletBootstrap::ApplyOperations(WriteTransactionState* tx_state,
}

Status TabletBootstrap::FilterOperation(const OperationResultPB& op_result,
bool* already_flushed) {
OpAction* action) {

// If the operation failed or was skipped, originally, no need to re-apply it.
if (op_result.has_failed_status() || op_result.flushed()) {
*already_flushed = true;
if (op_result.has_failed_status()) {
*action = SKIP_PREVIOUSLY_FAILED;
return Status::OK();
}

if (op_result.skip_on_replay()) {
*action = SKIP_PREVIOUSLY_FLUSHED;
return Status::OK();
}

Expand All @@ -1554,7 +1558,7 @@ Status TabletBootstrap::FilterOperation(const OperationResultPB& op_result,

if (num_active_stores == 0) {
// The mutation was fully flushed.
*already_flushed = true;
*action = SKIP_PREVIOUSLY_FLUSHED;
return Status::OK();
}

Expand All @@ -1568,7 +1572,7 @@ Status TabletBootstrap::FilterOperation(const OperationResultPB& op_result,
SecureShortDebugString(op_result));
}

*already_flushed = false;
*action = NEEDS_REPLAY;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion src/kudu/tserver/tablet_server-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ TEST_F(TabletServerTest, TestKUDU_1341) {
ANFF(VerifyRows(schema_, { KeyValue(1, 12345) }));
}

TEST_F(TabletServerTest, DISABLED_TestExactlyOnceForErrorsAcrossRestart) {
TEST_F(TabletServerTest, TestExactlyOnceForErrorsAcrossRestart) {
WriteRequestPB req;
WriteResponsePB resp;
RpcController rpc;
Expand Down

0 comments on commit 4df9f59

Please sign in to comment.