Skip to content

Commit

Permalink
Implement cancellation of remote processes (pantsbuild#17341)
Browse files Browse the repository at this point in the history
The new streaming remexec client does not support cancellation of remote processes (as added to the original client in pantsbuild#8222): this change reintroduces cancellation using the same strategy.
  • Loading branch information
stuhood authored Oct 26, 2022
1 parent d8c0488 commit 09af33d
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 46 deletions.
106 changes: 84 additions & 22 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,14 @@ use std::time::{Duration, Instant, SystemTime};
use async_oncecell::OnceCell;
use async_trait::async_trait;
use bytes::Bytes;
use concrete_time::TimeSpan;
use fs::{self, DirectoryDigest, File, PathStat, RelativePath, EMPTY_DIRECTORY_DIGEST};
use futures::future::{self, BoxFuture, TryFutureExt};
use futures::FutureExt;
use futures::{Stream, StreamExt};
use grpc_util::headers_to_http_header_map;
use grpc_util::prost::MessageExt;
use grpc_util::{layered_service, status_to_str, LayeredService};
use hashing::{Digest, Fingerprint};
use futures::{FutureExt, Stream, StreamExt};
use log::{debug, trace, warn, Level};
use prost::Message;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use protos::gen::google::longrunning::Operation;
use protos::gen::google::longrunning::{
operations_client::OperationsClient, CancelOperationRequest, Operation,
};
use protos::gen::google::rpc::{PreconditionFailure, Status as StatusProto};
use protos::require_digest;
use rand::{thread_rng, Rng};
Expand All @@ -31,11 +26,19 @@ use remexec::{
ExecuteRequest, ExecuteResponse, ExecutedActionMetadata, ServerCapabilities,
WaitExecutionRequest,
};
use store::{Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest};
use tonic::metadata::BinaryMetadataValue;
use tonic::{Code, Request, Status};
use tryfuture::try_future;
use uuid::Uuid;

use concrete_time::TimeSpan;
use fs::{self, DirectoryDigest, File, PathStat, RelativePath, EMPTY_DIRECTORY_DIGEST};
use grpc_util::headers_to_http_header_map;
use grpc_util::prost::MessageExt;
use grpc_util::{layered_service, status_to_str, LayeredService};
use hashing::{Digest, Fingerprint};
use store::{Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest};
use task_executor::Executor;
use workunit_store::{
in_workunit, Metric, ObservationMetric, RunId, RunningWorkunit, SpanId, WorkunitMetadata,
WorkunitStore,
Expand Down Expand Up @@ -104,7 +107,9 @@ pub struct CommandRunner {
instance_name: Option<String>,
process_cache_namespace: Option<String>,
store: Store,
executor: Executor,
execution_client: Arc<ExecutionClient<LayeredService>>,
operations_client: Arc<OperationsClient<LayeredService>>,
overall_deadline: Duration,
retry_interval_duration: Duration,
capabilities_cell: Arc<OnceCell<ServerCapabilities>>,
Expand All @@ -113,7 +118,47 @@ pub struct CommandRunner {

enum StreamOutcome {
Complete(OperationOrStatus),
StreamClosed(Option<String>),
StreamClosed,
}

/// A single remote Operation, with a `Drop` implementation to cancel the work if our client goes
/// away.
struct RunningOperation {
name: Option<String>,
operations_client: Arc<OperationsClient<LayeredService>>,
executor: Executor,
}

impl RunningOperation {
fn new(operations_client: Arc<OperationsClient<LayeredService>>, executor: Executor) -> Self {
Self {
name: None,
operations_client,
executor,
}
}

/// Marks the operation completed, which will avoid attempts to cancel it when this struct is
/// dropped.
fn completed(&mut self) {
let _ = self.name.take();
}
}

impl Drop for RunningOperation {
fn drop(&mut self) {
if let Some(operation_name) = self.name.take() {
debug!("Canceling remote operation {operation_name}");
let mut operations_client = self.operations_client.as_ref().clone();
let _ = self.executor.spawn(async move {
operations_client
.cancel_operation(CancelOperationRequest {
name: operation_name,
})
.await
});
}
}
}

impl CommandRunner {
Expand All @@ -125,6 +170,7 @@ impl CommandRunner {
root_ca_certs: Option<Vec<u8>>,
headers: BTreeMap<String, String>,
store: Store,
executor: Executor,
overall_deadline: Duration,
retry_interval_duration: Duration,
execution_concurrency_limit: usize,
Expand All @@ -151,14 +197,16 @@ impl CommandRunner {
execution_http_headers,
);
let execution_client = Arc::new(ExecutionClient::new(execution_channel.clone()));

let operations_client = Arc::new(OperationsClient::new(execution_channel.clone()));
let capabilities_client = Arc::new(CapabilitiesClient::new(execution_channel));

let command_runner = CommandRunner {
instance_name,
process_cache_namespace,
execution_client,
operations_client,
store,
executor,
overall_deadline,
retry_interval_duration,
capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())),
Expand Down Expand Up @@ -195,11 +243,15 @@ impl CommandRunner {
// Outputs progress reported by the server and returns the next actionable operation
// or gRPC status back to the main loop (plus the operation name so the main loop can
// reconnect).
async fn wait_on_operation_stream<S>(&self, mut stream: S, context: &Context) -> StreamOutcome
async fn wait_on_operation_stream<S>(
&self,
mut stream: S,
context: &Context,
running_operation: &mut RunningOperation,
) -> StreamOutcome
where
S: Stream<Item = Result<Operation, Status>> + Unpin,
{
let mut operation_name_opt: Option<String> = None;
let mut start_time_opt = Some(Instant::now());

trace!(
Expand Down Expand Up @@ -234,7 +286,7 @@ impl CommandRunner {
// Extract the operation name.
// Note: protobuf can return empty string for an empty field so convert empty strings
// to None.
operation_name_opt = Some(operation.name.clone()).filter(|s| !s.trim().is_empty());
running_operation.name = Some(operation.name.clone()).filter(|s| !s.trim().is_empty());

// Continue monitoring if the operation is not complete.
if !operation.done {
Expand All @@ -258,7 +310,7 @@ impl CommandRunner {
None => {
// Stream disconnected unexpectedly.
debug!("wait_on_operation_stream: unexpected disconnect from RE server");
return StreamOutcome::StreamClosed(operation_name_opt);
return StreamOutcome::StreamClosed;
}
}
}
Expand Down Expand Up @@ -570,7 +622,8 @@ impl CommandRunner {
const MAX_BACKOFF_DURATION: Duration = Duration::from_secs(10);

let start_time = Instant::now();
let mut current_operation_name: Option<String> = None;
let mut running_operation =
RunningOperation::new(self.operations_client.clone(), self.executor.clone());
let mut num_retries = 0;

loop {
Expand All @@ -585,7 +638,7 @@ impl CommandRunner {
tokio::time::sleep(sleep_time).await;
}

let rpc_result = match current_operation_name {
let rpc_result = match running_operation.name {
None => {
// The request has not been submitted yet. Submit the request using the REv2
// Execute method.
Expand Down Expand Up @@ -624,7 +677,7 @@ impl CommandRunner {
// or status to interpret.
let operation_stream = operation_stream_response.into_inner();
let stream_outcome = self
.wait_on_operation_stream(operation_stream, context)
.wait_on_operation_stream(operation_stream, context, &mut running_operation)
.await;

match stream_outcome {
Expand All @@ -634,10 +687,17 @@ impl CommandRunner {
context.build_id,
status
);
// We completed this operation.
running_operation.completed();
status
}
StreamOutcome::StreamClosed(operation_name_opt) => {
trace!("wait_on_operation_stream (build_id={}) returned stream close, will retry operation_name={:?}", context.build_id, operation_name_opt);
StreamOutcome::StreamClosed => {
trace!(
"wait_on_operation_stream (build_id={}) returned stream close, \
will retry operation_name={:?}",
context.build_id,
running_operation.name
);

// Check if the number of request attempts sent thus far have exceeded the number
// of retries allowed since the last successful connection. (There is no point in
Expand All @@ -653,7 +713,6 @@ impl CommandRunner {
}

// Iterate the loop to reconnect to the operation.
current_operation_name = operation_name_opt;
continue;
}
}
Expand All @@ -664,6 +723,9 @@ impl CommandRunner {
message: status.message().to_owned(),
..StatusProto::default()
};
// `OperationOrStatus` always represents a completed operation, so this operation
// is completed.
running_operation.completed();
OperationOrStatus::Status(status_proto)
}
};
Expand Down
Loading

0 comments on commit 09af33d

Please sign in to comment.