Skip to content

Commit

Permalink
subprocess: add KillAndWait() and allow customization of exit signal
Browse files Browse the repository at this point in the history
This patch does two things:
- It introduces a KillAndWait() method that terminates and fully reaps a
  process.
- It allows one to customize the exit signal delivered to a subprocess when
  it goes out of scope. The default signal is SIGKILL which doesn't let
  subprocesses clean up after themselves.

Change-Id: Iaf31bd4ea6de2917521a9852714fb33cfbec1f61
Reviewed-on: http://gerrit.cloudera.org:8080/6741
Tested-by: Adar Dembo <[email protected]>
Reviewed-by: David Ribeiro Alves <[email protected]>
  • Loading branch information
adembo committed Apr 29, 2017
1 parent a57cb4b commit eb79a71
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 8 deletions.
48 changes: 48 additions & 0 deletions src/kudu/util/subprocess-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,52 @@ TEST_F(SubprocessTest, TestGetExitStatusSignaled) {
}
}

TEST_F(SubprocessTest, TestSubprocessDestroyWithCustomSignal) {
string kTestFile = GetTestPath("foo");

// Start a subprocess that creates kTestFile immediately and deletes it on exit.
//
// Note: it's important that the shell not invoke a command while waiting
// to be killed (i.e. "sleep 60"); if it did, the signal could be delivered
// just after the command starts but just before the shell decides to forward
// signals to it, and we wind up with a deadlock.
vector<string> argv = {
"/bin/bash",
"-c",
Substitute(
// Delete kTestFile on exit.
"trap \"rm $0\" EXIT;"
// Create kTestFile on start.
"touch $0;"
// Spin in a tight loop waiting to be killed.
"while true;"
" do FOO=$$((FOO + 1));"
"done", kTestFile)
};

{
Subprocess s(argv);
ASSERT_OK(s.Start());
AssertEventually([&]{
ASSERT_TRUE(env_->FileExists(kTestFile));
});
}

// The subprocess went out of scope and was killed with SIGKILL, so it left
// kTestFile behind.
ASSERT_TRUE(env_->FileExists(kTestFile));

ASSERT_OK(env_->DeleteFile(kTestFile));
{
Subprocess s(argv, SIGTERM);
ASSERT_OK(s.Start());
AssertEventually([&]{
ASSERT_TRUE(env_->FileExists(kTestFile));
});
}

// The subprocess was killed with SIGTERM, giving it a chance to delete kTestFile.
ASSERT_FALSE(env_->FileExists(kTestFile));
}

} // namespace kudu
54 changes: 47 additions & 7 deletions src/kudu/util/subprocess.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/errno.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/signal.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/status.h"

using std::string;
Expand All @@ -64,6 +66,8 @@ using ::operator<<;

namespace {

static double kProcessWaitTimeoutSeconds = 5.0;

static const char* kProcSelfFd =
#if defined(__APPLE__)
"/dev/fd";
Expand Down Expand Up @@ -235,13 +239,14 @@ Status ReadFdsFully(const string& progname,

} // anonymous namespace

Subprocess::Subprocess(vector<string> argv)
Subprocess::Subprocess(vector<string> argv, int sig_on_destruct)
: program_(argv[0]),
argv_(std::move(argv)),
state_(kNotStarted),
child_pid_(-1),
fd_state_(),
child_fds_() {
child_fds_(),
sig_on_destruct_(sig_on_destruct) {
// By convention, the first argument in argv is the base name of the program.
argv_[0] = BaseName(argv_[0]);

Expand All @@ -255,11 +260,12 @@ Subprocess::Subprocess(vector<string> argv)

Subprocess::~Subprocess() {
if (state_ == kRunning) {
LOG(WARNING) << "Child process " << child_pid_
<< "(" << JoinStrings(argv_, " ") << ") "
<< " was orphaned. Sending SIGKILL...";
WARN_NOT_OK(Kill(SIGKILL), "Failed to send SIGKILL");
WARN_NOT_OK(Wait(), "Failed to Wait()");
LOG(WARNING) << Substitute(
"Child process $0 ($1) was orphaned. Sending signal $2...",
child_pid_, JoinStrings(argv_, " "), sig_on_destruct_);
WARN_NOT_OK(KillAndWait(sig_on_destruct_),
Substitute("Failed to KillAndWait() with signal $0",
sig_on_destruct_));
}

for (int i = 0; i < 3; ++i) {
Expand Down Expand Up @@ -498,6 +504,40 @@ Status Subprocess::Kill(int signal) {
return Status::OK();
}

Status Subprocess::KillAndWait(int signal) {
string procname = Substitute("$0 (pid $1)", argv0(), pid());

// This is a fatal error because all errors in Kill() are signal-independent,
// so Kill(SIGKILL) is just as likely to fail if this did.
RETURN_NOT_OK_PREPEND(
Kill(signal), Substitute("Failed to send signal $0 to $1",
signal, procname));
if (signal == SIGKILL) {
RETURN_NOT_OK_PREPEND(
Wait(), Substitute("Failed to wait on $0", procname));
} else {
Status s;
Stopwatch sw;
sw.start();
do {
s = WaitNoBlock();
if (s.ok()) {
break;
} else if (!s.IsTimedOut()) {
// An unexpected error in WaitNoBlock() is likely to manifest repeatedly,
// so there's no point in retrying this.
RETURN_NOT_OK_PREPEND(
s, Substitute("Unexpected failure while waiting on $0", procname));
}
SleepFor(MonoDelta::FromMilliseconds(10));
} while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds);
if (s.IsTimedOut()) {
return KillAndWait(SIGKILL);
}
}
return Status::OK();
}

Status Subprocess::GetExitStatus(int* exit_status, string* info_str) const {
if (state_ != kExited) {
const string err_str = "Sub-process termination hasn't yet been detected";
Expand Down
19 changes: 18 additions & 1 deletion src/kudu/util/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#ifndef KUDU_UTIL_SUBPROCESS_H
#define KUDU_UTIL_SUBPROCESS_H

#include <signal.h>

#include <map>
#include <string>
#include <vector>
Expand Down Expand Up @@ -47,7 +49,11 @@ namespace kudu {
// will be forcibly SIGKILLed to avoid orphaning processes.
class Subprocess {
public:
explicit Subprocess(std::vector<std::string> argv);
// Constructs a new Subprocess that will execute 'argv' on Start().
//
// If the process isn't explicitly killed, 'sig_on_destroy' will be delivered
// to it when the Subprocess goes out of scope.
explicit Subprocess(std::vector<std::string> argv, int sig_on_destruct = SIGKILL);
~Subprocess();

// Disable subprocess stream output. Must be called before subprocess starts.
Expand Down Expand Up @@ -100,6 +106,12 @@ class Subprocess {
// in order to reap it. Only call after starting.
Status Kill(int signal) WARN_UNUSED_RESULT;

// Sends a signal to the subprocess and waits for it to exit.
//
// If the signal is not SIGKILL and the process doesn't appear to be exiting,
// retries with SIGKILL.
Status KillAndWait(int signal);

// Retrieve exit status of the process awaited by Wait() and/or WaitNoBlock()
// methods. Must be called only after calling Wait()/WaitNoBlock().
Status GetExitStatus(int* exit_status, std::string* info_str = nullptr) const
Expand Down Expand Up @@ -140,6 +152,7 @@ class Subprocess {
int ReleaseChildStderrFd() { return ReleaseChildFd(STDERR_FILENO); }

pid_t pid() const;
const std::string& argv0() const { return argv_[0]; }

private:
enum State {
Expand Down Expand Up @@ -167,6 +180,10 @@ class Subprocess {
// Only valid if state_ == kExited.
int wait_status_;

// Custom signal to deliver when the subprocess goes out of scope, provided
// the process hasn't already been killed.
int sig_on_destruct_;

DISALLOW_COPY_AND_ASSIGN(Subprocess);
};

Expand Down

0 comments on commit eb79a71

Please sign in to comment.