Skip to content

Commit

Permalink
tserver: log a message when a scanner is not found
Browse files Browse the repository at this point in the history
Right now there is a class of issues where an "upstream" service or
query engine such as Impala or Spark may not be able to find a Kudu
scanner, resulting in a failed query. Often, this is because the scanner
timed out. The query engine will receive and typically log a message
indicating that the scanner could not be found, however it's difficult
to trace this back to the original query because the client wouldn't
know the scanner id and the server would not log the event.

With this change, it will be much easier to match up query engine
failures with expiring scanners by looking at the logs on both sides
because when a request comes in for a scanner that cannot be found, the
client will get a bad status:

  Not found: Scanner 7672e46ed30d42938c54bf6e7e24946e not found (it may have expired)

And the server will log a slightly more verbose message:

  I0926 14:59:50.600463 21137 tablet_service.cc:2020] Scan: Not found: Scanner 7672e46ed30d42938c54bf6e7e24946e not found (it may have expired): call sequence id=1, remote={username='mpercy'} at 127.0.0.1:42660

Additionally, the above situation is handled in a similar way in the
case of a TabletService::ScannerKeepAlive() RPC call.

Change-Id: If4e9c69160605d4a839ac2d28def67f4d3402b90
Reviewed-on: http://gerrit.cloudera.org:8080/11516
Tested-by: Mike Percy <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
mpercy committed Oct 2, 2018
1 parent 7f5f57f commit 6466c0d
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ public void testScannerExpiration() throws Exception {
scanner.nextRows();
fail("Exception was not thrown when accessing an expired scanner");
} catch (NonRecoverableException ex) {
assertThat(ex.getMessage(), containsString("Scanner not found"));
assertTrue("Expected Scanner not found error, got:\n" + ex.toString(),
ex.getMessage().matches(".*Scanner .* not found.*"));
}

// Closing an expired scanner shouldn't throw an exception.
Expand Down
3 changes: 2 additions & 1 deletion src/kudu/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,8 @@ TEST_F(ClientTest, TestNonFaultTolerantScannerExpired) {

// We should now get the appropriate error.
s = scanner.NextBatch(&batch);
EXPECT_EQ("Not found: Scanner not found", s.ToString());
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(), "Scanner .* not found");

// It should not have performed any retries. Since we restarted the server above,
// we should see only one Scan RPC: the latest (failed) attempt above.
Expand Down
32 changes: 24 additions & 8 deletions src/kudu/tserver/tablet_server-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/logging_test_util.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
Expand Down Expand Up @@ -1581,6 +1582,11 @@ TEST_P(ExpiredScannerParamTest, Test) {
// Initially, there've been no scanners, so none of have expired.
ASSERT_EQ(0, scanners_expired->value());

// Capture the glog output so we can ensure the scanner expiration message
// gets logged.
StringVectorSink capture_logs;
ScopedRegisterSink reg(&capture_logs);

// Open a scanner but don't read from it.
ScanResponsePB resp;
ASSERT_NO_FATAL_FAILURE(OpenScannerWithAllColumns(&resp, mode));
Expand All @@ -1599,16 +1605,19 @@ TEST_P(ExpiredScannerParamTest, Test) {
ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, resp.error().code());
ASSERT_STR_MATCHES(resp.error().status().message(), "Scanner .* not found");

ASSERT_STRINGS_ANY_MATCH(capture_logs.logged_msgs(), "Scan: .* Scanner .* not found .* remote=");
}

const ReadMode read_modes[] = {
static const ReadMode kReadModes[] = {
READ_LATEST,
READ_AT_SNAPSHOT,
READ_YOUR_WRITES,
};

INSTANTIATE_TEST_CASE_P(Params, ExpiredScannerParamTest,
testing::ValuesIn(read_modes));
testing::ValuesIn(kReadModes));

class ScanCorruptedDeltasParamTest :
public TabletServerTest,
Expand Down Expand Up @@ -1682,7 +1691,7 @@ TEST_P(ScanCorruptedDeltasParamTest, Test) {
}

INSTANTIATE_TEST_CASE_P(Params, ScanCorruptedDeltasParamTest,
testing::ValuesIn(read_modes));
testing::ValuesIn(kReadModes));

class ScannerOpenWhenServerShutsDownParamTest :
public TabletServerTest,
Expand All @@ -1705,7 +1714,7 @@ TEST_P(ScannerOpenWhenServerShutsDownParamTest, Test) {
}

INSTANTIATE_TEST_CASE_P(Params, ScannerOpenWhenServerShutsDownParamTest,
testing::ValuesIn(read_modes));
testing::ValuesIn(kReadModes));

TEST_F(TabletServerTest, TestSnapshotScan) {
const int num_rows = AllowSlowTests() ? 1000 : 100;
Expand Down Expand Up @@ -2358,7 +2367,7 @@ TEST_P(InvalidScanRequest_NewScanAndScannerIDParamTest, Test) {
}

INSTANTIATE_TEST_CASE_P(Params, InvalidScanRequest_NewScanAndScannerIDParamTest,
testing::ValuesIn(read_modes));
testing::ValuesIn(kReadModes));

// Test that passing a projection with fields not present in the tablet schema
// throws an exception.
Expand Down Expand Up @@ -2426,7 +2435,7 @@ TEST_P(InvalidScanRequest_WithIdsParamTest, Test) {
}

INSTANTIATE_TEST_CASE_P(Params, InvalidScanRequest_WithIdsParamTest,
testing::ValuesIn(read_modes));
testing::ValuesIn(kReadModes));

// Test scanning a tablet that has no entries.
TEST_F(TabletServerTest, TestScan_NoResults) {
Expand Down Expand Up @@ -2503,20 +2512,27 @@ TEST_P(InvalidScanSeqIdParamTest, Test) {
}

INSTANTIATE_TEST_CASE_P(Params, InvalidScanSeqIdParamTest,
testing::ValuesIn(read_modes));
testing::ValuesIn(kReadModes));

// Regression test for KUDU-1789: when ScannerKeepAlive is called on a non-existent
// scanner, it should properly respond with an error.
TEST_F(TabletServerTest, TestScan_KeepAliveExpiredScanner) {
StringVectorSink capture_logs;
ScopedRegisterSink reg(&capture_logs);

ScannerKeepAliveRequestPB req;
ScannerKeepAliveResponsePB resp;
RpcController rpc;

rpc.set_timeout(MonoDelta::FromSeconds(5));
req.set_scanner_id("does-not-exist");
ASSERT_OK(proxy_->ScannerKeepAlive(req, &resp, &rpc));
ASSERT_TRUE(resp.has_error());
ASSERT_TRUE(resp.has_error()) << SecureShortDebugString(resp);
ASSERT_EQ(resp.error().code(), TabletServerErrorPB::SCANNER_EXPIRED);
ASSERT_STR_MATCHES(resp.error().status().message(), "Scanner .* not found");

ASSERT_STRINGS_ANY_MATCH(capture_logs.logged_msgs(),
"ScannerKeepAlive: .* Scanner .* not found .* remote=");
}

void TabletServerTest::ScanYourWritesTest(uint64_t propagated_timestamp,
Expand Down
29 changes: 19 additions & 10 deletions src/kudu/tserver/tablet_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,11 @@ void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,
SharedScanner scanner;
if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) {
resp->mutable_error()->set_code(TabletServerErrorPB::SCANNER_EXPIRED);
StatusToPB(Status::NotFound("Scanner not found"), resp->mutable_error()->mutable_status());
Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
req->scanner_id()));
StatusToPB(s, resp->mutable_error()->mutable_status());
LOG(INFO) << Substitute("ScannerKeepAlive: $0: remote=$1",
s.ToString(), context->requestor_string());
context->RespondSuccess();
return;
}
Expand Down Expand Up @@ -1328,7 +1332,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
resp->set_snap_timestamp(scan_timestamp.ToUint64());
}
} else if (req->has_scanner_id()) {
Status s = HandleContinueScanRequest(req, &collector, &has_more_results, &error_code);
Status s = HandleContinueScanRequest(req, context, &collector, &has_more_results, &error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
Expand Down Expand Up @@ -1544,7 +1548,7 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
const ContinueChecksumRequestPB& continue_req = req->continue_request();
collector.set_agg_checksum(continue_req.previous_checksum());
scan_req.set_scanner_id(continue_req.scanner_id());
Status s = HandleContinueScanRequest(&scan_req, &collector, &has_more, &error_code);
Status s = HandleContinueScanRequest(&scan_req, context, &collector, &has_more, &error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
Expand Down Expand Up @@ -1978,8 +1982,8 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
// and call the second half directly
ScanRequestPB continue_req(*req);
continue_req.set_scanner_id(scanner->id());
RETURN_NOT_OK(HandleContinueScanRequest(&continue_req, result_collector, has_more_results,
error_code));
RETURN_NOT_OK(HandleContinueScanRequest(&continue_req, rpc_context, result_collector,
has_more_results, error_code));
} else {
// Increment the scanner call sequence ID. HandleContinueScanRequest handles
// this in the non-empty scan case.
Expand All @@ -1990,6 +1994,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,

// Continue an existing scan request.
Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
const RpcContext* rpc_context,
ScanResultCollector* result_collector,
bool* has_more_results,
TabletServerErrorPB::Code* error_code) {
Expand All @@ -1999,18 +2004,22 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,

size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);

// TODO: need some kind of concurrency control on these scanner objects
// TODO(todd): need some kind of concurrency control on these scanner objects
// in case multiple RPCs hit the same scanner at the same time. Probably
// just a trylock and fail the RPC if it contends.
SharedScanner scanner;
if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) {
if (batch_size_bytes == 0 && req->close_scanner()) {
// A request to close a non-existent scanner.
// Silently ignore any request to close a non-existent scanner.
return Status::OK();
} else {
*error_code = TabletServerErrorPB::SCANNER_EXPIRED;
return Status::NotFound("Scanner not found");
}

*error_code = TabletServerErrorPB::SCANNER_EXPIRED;
Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
req->scanner_id()));
LOG(INFO) << Substitute("Scan: $0: call sequence id=$1, remote=$2",
s.ToString(), req->call_seq_id(), rpc_context->requestor_string());
return s;
}

// Set the row format flags on the ScanResultCollector.
Expand Down
1 change: 1 addition & 0 deletions src/kudu/tserver/tablet_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class TabletServiceImpl : public TabletServerServiceIf {
TabletServerErrorPB::Code* error_code);

Status HandleContinueScanRequest(const ScanRequestPB* req,
const rpc::RpcContext* rpc_context,
ScanResultCollector* result_collector,
bool* has_more_results,
TabletServerErrorPB::Code* error_code);
Expand Down

0 comments on commit 6466c0d

Please sign in to comment.