Skip to content

Commit

Permalink
IMPALA-7985: Port RemoteShutdown() to KRPC.
Browse files Browse the repository at this point in the history
The :shutdown command is used to shutdown a remote server. The common
case is that a user specifies the impalad to shutdown by specifying a
host e.g. :shutdown('host100'). If a user has more than one impalad on a
remote host then the form :shutdown('<host>:<port>') can be used to
specify the port by which the impalad can be contacted. Prior to
IMPALA-7985 this port was the backend port, e.g.
:shutdown('host100:22000'). With IMPALA-7985 the port to use is the KRPC
port, e.g. :shutdown('host100:27000').

Shutdown is implemented by making an rpc call to the target impalad.
This changes the implementation of this call to use KRPC.

To aid the user in finding the KRPC port, the KRPC address is added to
the /backends section of the debug web page.

We attempt to detect the case where :shutdown is pointed at a thrift
port (like the backend port) and print an informative message.

Documentation of this change will be done in IMPALA-8098.
Further improvements to DoRpcWithRetry() will be done in IMPALA-8143.

For discussion of why it was chosen to implement this change in an
incompatible way, see comments in
https://issues.apache.org/jira/browse/IMPALA-7985.

TESTING

Ran all end-to-end tests.
Enhance the test for /backends in test_web_pages.py.
In test_restart_services.py add a call to the old backend port to the
test. Some expected error messages were changed in line with what KRPC
returns.

Change-Id: I4fd00ee4e638f5e71e27893162fd65501ef9e74e
Reviewed-on: http://gerrit.cloudera.org:8080/12260
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
  • Loading branch information
bartash authored and cloudera-hudson committed Feb 9, 2019
1 parent 0185b6e commit 31c6e2a
Show file tree
Hide file tree
Showing 16 changed files with 229 additions and 150 deletions.
8 changes: 0 additions & 8 deletions be/src/runtime/backend-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
ImpalaInternalServiceClient::recv_PublishFilter(_return);
}

void RemoteShutdown(TRemoteShutdownResult& _return, const TRemoteShutdownParams& params,
bool* send_done) {
DCHECK(!*send_done);
ImpalaInternalServiceClient::send_RemoteShutdown(params);
*send_done = true;
ImpalaInternalServiceClient::recv_RemoteShutdown(_return);
}

#pragma clang diagnostic pop

private:
Expand Down
21 changes: 2 additions & 19 deletions be/src/runtime/coordinator-backend-state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,23 +417,6 @@ void Coordinator::BackendState::UpdateExecStats(
}
}

template <typename F>
Status Coordinator::BackendState::DoRrpcWithRetry(
F&& rpc_call, const char* debug_action, const char* error_msg) {
Status rpc_status;
for (int i = 0; i < 3; i++) {
RpcController rpc_controller;
rpc_controller.set_timeout(MonoDelta::FromSeconds(10));
// Check for injected failures.
rpc_status = DebugAction(query_ctx().client_request.query_options, debug_action);
if (!rpc_status.ok()) continue;

rpc_status = FromKuduStatus(rpc_call(&rpc_controller), error_msg);
if (rpc_status.ok()) break;
}
return rpc_status;
}

bool Coordinator::BackendState::Cancel() {
unique_lock<mutex> l(lock_);

Expand Down Expand Up @@ -472,8 +455,8 @@ bool Coordinator::BackendState::Cancel() {
return proxy->CancelQueryFInstances(request, &response, rpc_controller);
};

Status rpc_status = DoRrpcWithRetry(
cancel_rpc, "COORD_CANCEL_QUERY_FINSTANCES_RPC", "Cancel() RPC failed");
Status rpc_status = ControlService::DoRpcWithRetry(cancel_rpc, query_ctx(),
"COORD_CANCEL_QUERY_FINSTANCES_RPC", "Cancel() RPC failed", 3, 10);

if (!rpc_status.ok()) {
status_.MergeStatus(rpc_status);
Expand Down
5 changes: 0 additions & 5 deletions be/src/runtime/coordinator-backend-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,6 @@ class Coordinator::BackendState {

/// Same as ComputeResourceUtilization() but caller must hold lock.
ResourceUtilization ComputeResourceUtilizationLocked();

/// Retry the Rpc 'rpc_call' up to 3 times.
/// Pass 'debug_action' to DebugAction() to potentially inject errors.
template <typename F>
Status DoRrpcWithRetry(F&& rpc_call, const char* debug_action, const char* error_msg);
};

/// Per fragment execution statistics.
Expand Down
79 changes: 59 additions & 20 deletions be/src/service/client-request-state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
#include <limits>
#include <gutil/strings/substitute.h>

#include "exec/kudu-util.h"
#include "kudu/rpc/rpc_controller.h"
#include "runtime/backend-client.h"
#include "runtime/coordinator.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/exec-env.h"
#include "scheduling/admission-controller.h"
#include "scheduling/scheduler.h"
#include "service/frontend.h"
Expand All @@ -40,19 +42,23 @@

#include "gen-cpp/CatalogService.h"
#include "gen-cpp/CatalogService_types.h"
#include "gen-cpp/control_service.pb.h"
#include "gen-cpp/control_service.proxy.h"

#include <thrift/Thrift.h>

#include "common/names.h"
#include "control-service.h"

using boost::algorithm::iequals;
using boost::algorithm::join;
using kudu::rpc::RpcController;
using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;
using namespace beeswax;
using namespace strings;

DECLARE_int32(be_port);
DECLARE_int32(krpc_port);
DECLARE_int32(catalog_service_port);
DECLARE_string(catalog_service_host);
DECLARE_int64(max_result_cache_size);
Expand Down Expand Up @@ -630,39 +636,72 @@ Status ClientRequestState::ExecDdlRequest() {

Status ClientRequestState::ExecShutdownRequest() {
const TShutdownParams& request = exec_request_.admin_request.shutdown_params;
int port = request.__isset.backend && request.backend.port != 0 ? request.backend.port :
FLAGS_be_port;
bool backend_port_specified = request.__isset.backend && request.backend.port != 0;
int port = backend_port_specified ? request.backend.port : FLAGS_krpc_port;
// Use the local shutdown code path if the host is unspecified or if it exactly matches
// the configured host/port. This avoids the possibility of RPC errors preventing
// shutdown.
if (!request.__isset.backend
|| (request.backend.hostname == FLAGS_hostname && port == FLAGS_be_port)) {
TShutdownStatus shutdown_status;
|| (request.backend.hostname == FLAGS_hostname && port == FLAGS_krpc_port)) {
ShutdownStatusPB shutdown_status;
int64_t deadline_s = request.__isset.deadline_s ? request.deadline_s : -1;
RETURN_IF_ERROR(parent_server_->StartShutdown(deadline_s, &shutdown_status));
SetResultSet({ImpalaServer::ShutdownStatusToString(shutdown_status)});
return Status::OK();
}
TNetworkAddress addr = MakeNetworkAddress(request.backend.hostname, port);

TRemoteShutdownParams params;
if (request.__isset.deadline_s) params.__set_deadline_s(request.deadline_s);
TRemoteShutdownResult resp;
// KRPC relies on resolved IP address, so convert hostname.
IpAddr ip_address;
Status ip_status = HostnameToIpAddr(request.backend.hostname, &ip_address);
if (!ip_status.ok()) {
VLOG(1) << "Could not convert hostname " << request.backend.hostname
<< " to ip address, error: " << ip_status.GetDetail();
return ip_status;
}
TNetworkAddress addr = MakeNetworkAddress(ip_address, port);

std::unique_ptr<ControlServiceProxy> proxy;
Status get_proxy_status = ControlService::GetProxy(addr, addr.hostname, &proxy);
if (!get_proxy_status.ok()) {
return Status(
Substitute("Could not get Proxy to ControlService at $0 with error: $1.",
TNetworkAddressToString(addr), get_proxy_status.msg().msg()));
}

RemoteShutdownParamsPB params;
if (request.__isset.deadline_s) params.set_deadline_s(request.deadline_s);
RemoteShutdownResultPB resp;
VLOG_QUERY << "Sending Shutdown RPC to " << TNetworkAddressToString(addr);
ImpalaBackendConnection::RpcStatus rpc_status = ImpalaBackendConnection::DoRpcWithRetry(
ExecEnv::GetInstance()->impalad_client_cache(), addr,
&ImpalaBackendClient::RemoteShutdown, params,
[this]() { return DebugAction(query_options(), "CRS_SHUTDOWN_RPC"); }, &resp);
if (!rpc_status.status.ok()) {

auto shutdown_rpc = [&](RpcController* rpc_controller) -> kudu::Status {
return proxy->RemoteShutdown(params, &resp, rpc_controller);
};

Status rpc_status = ControlService::DoRpcWithRetry(
shutdown_rpc, query_ctx_, "CRS_SHUTDOWN_RPC", "RemoteShutdown() RPC failed", 3, 10);

if (!rpc_status.ok()) {
const string& msg = rpc_status.msg().msg();
VLOG_QUERY << "RemoteShutdown query_id= " << PrintId(query_id())
<< " failed to send RPC to " << TNetworkAddressToString(addr) << " :"
<< rpc_status.status.msg().msg();
return rpc_status.status;
<< msg;
string err_string = Substitute(
"Rpc to $0 failed with error '$1'", TNetworkAddressToString(addr), msg);
// Attempt to detect if the the failure is because of not using a KRPC port.
if (backend_port_specified
&& msg.find("RemoteShutdown() RPC failed: Timed out: connection negotiation to")
!= string::npos) {
// Prior to IMPALA-7985 :shutdown() used the backend port.
err_string.append(" This may be because the port specified is wrong. You may have"
" specified the backend (thrift) port which :shutdown() can no"
" longer use. Please make sure the correct KRPC port is being"
" used, or don't specify any port in the :shutdown() command.");
}
return Status(err_string);
}

Status shutdown_status(resp.status);
Status shutdown_status(resp.status());
RETURN_IF_ERROR(shutdown_status);
SetResultSet({ImpalaServer::ShutdownStatusToString(resp.shutdown_status)});
SetResultSet({ImpalaServer::ShutdownStatusToString(resp.shutdown_status())});
return Status::OK();
}

Expand Down
25 changes: 19 additions & 6 deletions be/src/service/control-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "common/constant-strings.h"
#include "exec/kudu-util.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
#include "rpc/rpc-mgr.h"
#include "rpc/rpc-mgr.inline.h"
#include "runtime/coordinator.h"
Expand Down Expand Up @@ -94,7 +95,7 @@ bool ControlService::Authorize(const google::protobuf::Message* req,
}

Status ControlService::GetProfile(const ReportExecStatusRequestPB& request,
const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
const ClientRequestState& request_state, RpcContext* rpc_context,
TRuntimeProfileForest* thrift_profiles) {
// Debug action to simulate deserialization failure.
RETURN_IF_ERROR(DebugAction(request_state.query_options(),
Expand All @@ -110,7 +111,7 @@ Status ControlService::GetProfile(const ReportExecStatusRequestPB& request,
}

void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
ReportExecStatusResponsePB* response, kudu::rpc::RpcContext* rpc_context) {
ReportExecStatusResponsePB* response, RpcContext* rpc_context) {
const TUniqueId query_id = ProtoToQueryId(request->query_id());
shared_ptr<ClientRequestState> request_state =
ExecEnv::GetInstance()->impala_server()->GetClientRequestState(query_id);
Expand Down Expand Up @@ -152,20 +153,21 @@ void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
RespondAndReleaseRpc(resp_status, response, rpc_context);
}

template<typename ResponsePBType>
void ControlService::RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
kudu::rpc::RpcContext* rpc_context) {
template <typename ResponsePBType>
void ControlService::RespondAndReleaseRpc(
const Status& status, ResponsePBType* response, RpcContext* rpc_context) {
status.ToProto(response->mutable_status());
// Release the memory against the control service's memory tracker.
mem_tracker_->Release(rpc_context->GetTransferSize());
rpc_context->RespondSuccess();
}

void ControlService::CancelQueryFInstances(const CancelQueryFInstancesRequestPB* request,
CancelQueryFInstancesResponsePB* response, ::kudu::rpc::RpcContext* rpc_context) {
CancelQueryFInstancesResponsePB* response, RpcContext* rpc_context) {
DCHECK(request->has_query_id());
const TUniqueId& query_id = ProtoToQueryId(request->query_id());
VLOG_QUERY << "CancelQueryFInstances(): query_id=" << PrintId(query_id);
// TODO(IMPALA-8143) Use DebugAction for fault injection.
FAULT_INJECTION_RPC_DELAY(RPC_CANCELQUERYFINSTANCES);
QueryState::ScopedRef qs(query_id);
if (qs.get() == nullptr) {
Expand All @@ -177,4 +179,15 @@ void ControlService::CancelQueryFInstances(const CancelQueryFInstancesRequestPB*
qs->Cancel();
RespondAndReleaseRpc(Status::OK(), response, rpc_context);
}

void ControlService::RemoteShutdown(const RemoteShutdownParamsPB* req,
RemoteShutdownResultPB* response, RpcContext* rpc_context) {
// TODO(IMPALA-8143) Use DebugAction for fault injection.
FAULT_INJECTION_RPC_DELAY(RPC_REMOTESHUTDOWN);
Status status = ExecEnv::GetInstance()->impala_server()->StartShutdown(
req->has_deadline_s() ? req->deadline_s() : -1,
response->mutable_shutdown_status());

RespondAndReleaseRpc(status, response, rpc_context);
}
}
35 changes: 35 additions & 0 deletions be/src/service/control-service.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@

#include "gen-cpp/control_service.service.h"

#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
#include "util/debug-util.h"

#include "common/status.h"

using kudu::MonoDelta;
using kudu::rpc::RpcContext;
using kudu::rpc::RpcController;

namespace kudu {
namespace rpc {
class RpcContext;
Expand Down Expand Up @@ -59,11 +67,38 @@ class ControlService : public ControlServiceIf {
virtual void CancelQueryFInstances(const CancelQueryFInstancesRequestPB* req,
CancelQueryFInstancesResponsePB* resp, ::kudu::rpc::RpcContext* context) override;

/// Initiate shutdown.
virtual void RemoteShutdown(const RemoteShutdownParamsPB* req,
RemoteShutdownResultPB* response, ::kudu::rpc::RpcContext* context) override;

/// Gets a ControlService proxy to a server with 'address' and 'hostname'.
/// The newly created proxy is returned in 'proxy'. Returns error status on failure.
static Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
std::unique_ptr<ControlServiceProxy>* proxy);

/// Retry the Rpc 'rpc_call' up to 'times_to_try' times.
/// Each Rpc has a timeout of 'timeout_s' seconds.
/// There is no sleeping between retries.
/// Pass 'debug_action' to DebugAction() to potentially inject errors.
template <typename F>
static Status DoRpcWithRetry(F&& rpc_call, const TQueryCtx& query_ctx,
const char* debug_action, const char* error_msg, int times_to_try, int timeout_s) {
DCHECK_GT(times_to_try, 0);
Status rpc_status;
for (int i = 0; i < times_to_try; i++) {
RpcController rpc_controller;
rpc_controller.set_timeout(MonoDelta::FromSeconds(timeout_s));
// Check for injected failures.
rpc_status = DebugAction(query_ctx.client_request.query_options, debug_action);
if (!rpc_status.ok()) continue;

rpc_status = FromKuduStatus(rpc_call(&rpc_controller), error_msg);
if (rpc_status.ok()) break;
// TODO(IMPALA-8143) Add a sleep if RpcMgr::IsServerTooBusy().
}
return rpc_status;
}

private:
/// Tracks the memory usage of payload in the service queue.
std::unique_ptr<MemTracker> mem_tracker_;
Expand Down
5 changes: 4 additions & 1 deletion be/src/service/impala-http-handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "scheduling/admission-controller.h"
#include "service/impala-server.h"
#include "service/client-request-state.h"
#include "service/frontend.h"
#include "service/impala-server.h"
#include "thrift/protocol/TDebugProtocol.h"
#include "util/coding-util.h"
#include "util/logging-support.h"
Expand Down Expand Up @@ -854,7 +854,10 @@ void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args,
Value backend_obj(kObjectType);
string address = TNetworkAddressToString(backend.address);
Value str(address.c_str(), document->GetAllocator());
Value krpc_address(
TNetworkAddressToString(backend.krpc_address).c_str(), document->GetAllocator());
backend_obj.AddMember("address", str, document->GetAllocator());
backend_obj.AddMember("krpc_address", krpc_address, document->GetAllocator());
backend_obj.AddMember("is_coordinator", backend.is_coordinator,
document->GetAllocator());
backend_obj.AddMember("is_executor", backend.is_executor, document->GetAllocator());
Expand Down
8 changes: 0 additions & 8 deletions be/src/service/impala-internal-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,3 @@ void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
if (qs.get() == nullptr) return;
qs->PublishFilter(params);
}

void ImpalaInternalService::RemoteShutdown(TRemoteShutdownResult& return_val,
const TRemoteShutdownParams& params) {
FAULT_INJECTION_RPC_DELAY(RPC_REMOTESHUTDOWN);
Status status = impala_server_->StartShutdown(
params.__isset.deadline_s ? params.deadline_s : -1, &return_val.shutdown_status);
status.ToThrift(&return_val.status);
}
2 changes: 0 additions & 2 deletions be/src/service/impala-internal-service.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
const TUpdateFilterParams& params);
virtual void PublishFilter(TPublishFilterResult& return_val,
const TPublishFilterParams& params);
virtual void RemoteShutdown(TRemoteShutdownResult& return_val,
const TRemoteShutdownParams& params);

private:
ImpalaServer* impala_server_;
Expand Down
Loading

0 comments on commit 31c6e2a

Please sign in to comment.