Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
[flow_control] Fixes for the flow_control_fixes experiment found via …
Browse files Browse the repository at this point in the history
…fuzzing (grpc#31676)

* [flow_control] Enable experiment in debug builds, fix bugs found by fuzzer

* fix

* disable experiment

* flowctl

* flowctl

* Automated change: Fix sanity tests

* fix

* fix

Co-authored-by: ctiller <[email protected]>
  • Loading branch information
ctiller and ctiller authored Nov 17, 2022
1 parent b6f2af4 commit fe5aace
Show file tree
Hide file tree
Showing 9 changed files with 466 additions and 15 deletions.
26 changes: 23 additions & 3 deletions src/core/ext/transport/chttp2/transport/flow_control.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,21 @@ void TransportFlowControl::UpdateSetting(
Clamp(new_desired_value, grpc_chttp2_settings_parameters[id].min_value,
grpc_chttp2_settings_parameters[id].max_value);
if (new_desired_value != *desired_value) {
*desired_value = new_desired_value;
if (grpc_flowctl_trace.enabled()) {
gpr_log(GPR_INFO, "[flowctl] UPDATE SETTING %s from %" PRId64 " to %d",
grpc_chttp2_settings_parameters[id].name, *desired_value,
new_desired_value);
}
// Reaching zero can only happen for initial window size, and if it occurs
// we really want to wake up writes and ensure all the queued stream
// window updates are flushed, since stream flow control operates
// differently at zero window size.
FlowControlAction::Urgency urgency =
FlowControlAction::Urgency::QUEUE_UPDATE;
if (new_desired_value == 0) {
if (*desired_value == 0 || new_desired_value == 0) {
urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
}
*desired_value = new_desired_value;
(action->*set)(urgency, *desired_value);
}
} else {
Expand All @@ -302,6 +307,21 @@ void TransportFlowControl::UpdateSetting(
}
}

FlowControlAction TransportFlowControl::SetAckedInitialWindow(uint32_t value) {
acked_init_window_ = value;
FlowControlAction action;
if (IsFlowControlFixesEnabled() &&
acked_init_window_ != target_initial_window_size_) {
FlowControlAction::Urgency urgency =
FlowControlAction::Urgency::QUEUE_UPDATE;
if (acked_init_window_ == 0 || target_initial_window_size_ == 0) {
urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
}
action.set_send_initial_window_update(urgency, target_initial_window_size_);
}
return action;
}

FlowControlAction TransportFlowControl::PeriodicUpdate() {
FlowControlAction action;
if (enable_bdp_probe_) {
Expand Down Expand Up @@ -393,7 +413,7 @@ uint32_t StreamFlowControl::MaybeSendUpdate() {
pending_size_ = absl::nullopt;
tfc_upd.UpdateAnnouncedWindowDelta(&announced_window_delta_, announce);
GPR_ASSERT(DesiredAnnounceSize() == 0);
tfc_upd.MakeAction();
std::ignore = tfc_upd.MakeAction();
return static_cast<uint32_t>(announce);
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/ext/transport/chttp2/transport/flow_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ enum class StallEdge { kNoChange, kStalled, kUnstalled };
// Encapsulates a collections of actions the transport needs to take with
// regard to flow control. Each action comes with urgencies that tell the
// transport how quickly the action must take place.
class FlowControlAction {
class GRPC_MUST_USE_RESULT FlowControlAction {
public:
enum class Urgency : uint8_t {
// Nothing to be done.
Expand Down Expand Up @@ -261,7 +261,7 @@ class TransportFlowControl final {
uint32_t acked_init_window() const { return acked_init_window_; }
uint32_t sent_init_window() const { return target_initial_window_size_; }

void SetAckedInitialWindow(uint32_t value) { acked_init_window_ = value; }
FlowControlAction SetAckedInitialWindow(uint32_t value);

// Getters
int64_t remote_window() const { return remote_window_; }
Expand Down
8 changes: 5 additions & 3 deletions src/core/ext/transport/chttp2/transport/parsing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -697,9 +697,11 @@ static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) {
t->hpack_parser.hpack_table()->SetMaxBytes(
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
t->flow_control.SetAckedInitialWindow(
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
grpc_chttp2_act_on_flowctl_action(
t->flow_control.SetAckedInitialWindow(
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
t, nullptr);
t->sent_local_settings = false;
}
t->parser = grpc_chttp2_settings_parser_parse;
Expand Down
20 changes: 19 additions & 1 deletion test/core/transport/chttp2/flow_control_fuzzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "absl/base/attributes.h"
#include "absl/status/status.h"
#include "absl/strings/str_join.h"
#include "absl/types/optional.h"

#include <grpc/event_engine/memory_request.h>
Expand Down Expand Up @@ -220,7 +221,9 @@ void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) {
fprintf(stderr, "Received ACK for initial window size %d\n",
*sent_from_remote.ack_initial_window_size);
}
tfc_->SetAckedInitialWindow(*sent_from_remote.ack_initial_window_size);
PerformAction(tfc_->SetAckedInitialWindow(
*sent_from_remote.ack_initial_window_size),
nullptr);
sending_initial_window_size_ = false;
}
if (sent_from_remote.bdp_pong) {
Expand Down Expand Up @@ -356,6 +359,7 @@ void FlowControlFuzzer::AssertNoneStuck() const {
std::map<uint32_t, int64_t> reconciled_stream_deltas;
int64_t reconciled_transport_window = remote_transport_window_size_;
int64_t reconciled_initial_window = remote_initial_window_size_;
std::vector<uint64_t> inflight_send_initial_windows;
for (const auto& id_stream : streams_) {
reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta;
}
Expand All @@ -365,6 +369,8 @@ void FlowControlFuzzer::AssertNoneStuck() const {
for (const auto& send_to_remote : send_to_remote_) {
if (send_to_remote.initial_window_size.has_value()) {
reconciled_initial_window = *send_to_remote.initial_window_size;
inflight_send_initial_windows.push_back(
*send_to_remote.initial_window_size);
}
reconciled_transport_window += send_to_remote.transport_window_update;
for (const auto& stream_update : send_to_remote.stream_window_updates) {
Expand All @@ -381,6 +387,14 @@ void FlowControlFuzzer::AssertNoneStuck() const {
}
}

// If we're sending an initial window size we get to consider a queued initial
// window size too: it'll be sent as soon as the remote acks the settings
// change, which it must.
if (sending_initial_window_size_ && queued_initial_window_size_.has_value()) {
reconciled_initial_window = *queued_initial_window_size_;
inflight_send_initial_windows.push_back(*queued_initial_window_size_);
}

// Finally, if a stream has indicated it's willing to read, the reconciled
// remote *MUST* be in a state where it could send at least one byte.
for (const auto& id_stream : streams_) {
Expand All @@ -396,6 +410,10 @@ void FlowControlFuzzer::AssertNoneStuck() const {
reconciled_stream_deltas[id_stream.first],
reconciled_initial_window,
(id_stream.second.fc.min_progress_size()));
fprintf(stderr,
"initial_window breakdown: remote=%" PRId32 ", in-flight={%s}\n",
remote_initial_window_size_,
absl::StrJoin(inflight_send_initial_windows, ",").c_str());
abort();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
enable_bdp: true
actions {
set_min_progress_size {
id: 11264
size: 2883584
}
}
actions {
set_pending_size {
id: 1593844738
size: 11264
}
}
actions {
set_memory_quota: 0
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
stream_write {
id: 11264
size: 2883584
}
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
read_send_from_remote {
}
}
actions {
step_time_ms: 474140931328
}
actions {
read_send_from_remote {
}
}
actions {
read_send_to_remote {
}
}
actions {
read_send_from_remote {
}
}
actions {
read_send_from_remote {
}
}
actions {
set_memory_quota: 474140901376
}
actions {
read_send_to_remote {
}
}
actions {
allocate_memory: 1572864
}
actions {
periodic_update {
}
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
set_min_progress_size {
id: 1593844738
size: 11264
}
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
read_send_from_remote {
}
}
actions {
periodic_update {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
enable_bdp: true
actions {
perform_send_from_remote {
}
}
actions {
periodic_update {
}
}
actions {
set_memory_quota: 0
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
stream_write {
id: 2883584
size: 2883584
}
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
periodic_update {
}
}
actions {
step_time_ms: 474140901376
}
actions {
read_send_from_remote {
}
}
actions {
read_send_to_remote {
}
}
actions {
read_send_from_remote {
}
}
actions {
read_send_from_remote {
}
}
actions {
set_memory_quota: 474140901376
}
actions {
read_send_to_remote {
}
}
actions {
allocate_memory: 1572864
}
actions {
periodic_update {
}
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
set_min_progress_size {
id: 1593844738
size: 11264
}
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
actions {
step_time_ms: 474140901376
}
actions {
periodic_update {
}
}
Loading

0 comments on commit fe5aace

Please sign in to comment.