Skip to content

Commit

Permalink
ext_authz: Add Google gRPC client cache (envoyproxy#13265)
Browse files Browse the repository at this point in the history
This patch adds Google gRPC client cache for ext_authz filter to avoid channel creation for each request, which greatly increased the latency.

Risk Level: Low
Testing: Added
Docs Changes: N/A
Release Notes: N/A

Signed-off-by: Fangpeng Liu <[email protected]>
  • Loading branch information
fpliu233 authored Oct 7, 2020
1 parent efd7cc5 commit ede6604
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 12 deletions.
1 change: 1 addition & 0 deletions include/envoy/grpc/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class RawAsyncClient {
};

using RawAsyncClientPtr = std::unique_ptr<RawAsyncClient>;
using RawAsyncClientSharedPtr = std::shared_ptr<RawAsyncClient>;

} // namespace Grpc
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/grpc/google_async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ GoogleAsyncClientImpl::GoogleAsyncClientImpl(Event::Dispatcher& dispatcher,
// new connection implied.
std::shared_ptr<grpc::Channel> channel = GoogleGrpcUtils::createChannel(config, api);
stub_ = stub_factory.createStub(channel);
scope_->counterFromStatName(stat_names.google_grpc_client_creation_).inc();
// Initialize client stats.
// TODO(jmarantz): Capture these names in async_client_manager_impl.cc and
// pass in a struct of StatName objects so we don't have to take locks here.
Expand Down
3 changes: 2 additions & 1 deletion source/common/grpc/stat_names.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ namespace Envoy {
namespace Grpc {

StatNames::StatNames(Stats::SymbolTable& symbol_table)
: pool_(symbol_table), streams_total_(pool_.add("streams_total")) {
: pool_(symbol_table), streams_total_(pool_.add("streams_total")),
google_grpc_client_creation_(pool_.add("google_grpc_client_creation")) {
for (uint32_t i = 0; i <= Status::WellKnownGrpcStatus::MaximumKnown; ++i) {
std::string status_str = absl::StrCat(i);
streams_closed_[i] = pool_.add(absl::StrCat("streams_closed_", status_str));
Expand Down
2 changes: 2 additions & 0 deletions source/common/grpc/stat_names.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ struct StatNames {
Stats::StatName streams_total_;
std::array<Stats::StatName, Status::WellKnownGrpcStatus::MaximumKnown + 1> streams_closed_;
absl::flat_hash_map<std::string, Stats::StatName> status_names_;
// Stat name tracking the creation of the Google grpc client.
Stats::StatName google_grpc_client_creation_;
};

} // namespace Grpc
Expand Down
3 changes: 2 additions & 1 deletion source/common/grpc/typed_async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ template <typename Request, typename Response> class AsyncClient /* : public Raw
public:
AsyncClient() = default;
AsyncClient(RawAsyncClientPtr&& client) : client_(std::move(client)) {}
AsyncClient(RawAsyncClientSharedPtr client) : client_(client) {}
virtual ~AsyncClient() = default;

virtual AsyncRequest* send(const Protobuf::MethodDescriptor& service_method,
Expand Down Expand Up @@ -192,7 +193,7 @@ template <typename Request, typename Response> class AsyncClient /* : public Raw
void reset() { client_.reset(); }

private:
RawAsyncClientPtr client_{};
RawAsyncClientSharedPtr client_{};
};

} // namespace Grpc
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/common/ext_authz/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ envoy_cc_library(
"//source/common/protobuf",
"//source/common/tracing:http_tracer_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/ext_authz/v3:pkg_cc_proto",
"@envoy_api//envoy/service/auth/v2alpha:pkg_cc_proto",
"@envoy_api//envoy/service/auth/v3:pkg_cc_proto",
],
Expand Down
21 changes: 19 additions & 2 deletions source/extensions/filters/common/ext_authz/ext_authz_grpc_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ namespace Filters {
namespace Common {
namespace ExtAuthz {

GrpcClientImpl::GrpcClientImpl(Grpc::RawAsyncClientPtr&& async_client,
GrpcClientImpl::GrpcClientImpl(Grpc::RawAsyncClientSharedPtr async_client,
const absl::optional<std::chrono::milliseconds>& timeout,
envoy::config::core::v3::ApiVersion transport_api_version,
bool use_alpha)
: async_client_(std::move(async_client)), timeout_(timeout),
: async_client_(async_client), timeout_(timeout),
service_method_(Grpc::VersionedMethods("envoy.service.auth.v3.Authorization.Check",
"envoy.service.auth.v2.Authorization.Check",
"envoy.service.auth.v2alpha.Authorization.Check")
Expand Down Expand Up @@ -143,6 +143,23 @@ void GrpcClientImpl::toAuthzResponseHeader(
}
}

const Grpc::RawAsyncClientSharedPtr AsyncClientCache::getOrCreateAsyncClient(
const envoy::extensions::filters::http::ext_authz::v3::ExtAuthz& proto_config) {
// The cache stores Google gRPC client, so channel is not created for each request.
ASSERT(proto_config.has_grpc_service() && proto_config.grpc_service().has_google_grpc());
auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
const std::size_t cache_key = MessageUtil::hash(proto_config.grpc_service().google_grpc());
const auto it = cache.async_clients_.find(cache_key);
if (it != cache.async_clients_.end()) {
return it->second;
}
const Grpc::AsyncClientFactoryPtr factory =
async_client_manager_.factoryForGrpcService(proto_config.grpc_service(), scope_, true);
const Grpc::RawAsyncClientSharedPtr async_client = factory->create();
cache.async_clients_.emplace(cache_key, async_client);
return async_client;
}

} // namespace ExtAuthz
} // namespace Common
} // namespace Filters
Expand Down
36 changes: 35 additions & 1 deletion source/extensions/filters/common/ext_authz/ext_authz_grpc_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <vector>

#include "envoy/config/core/v3/base.pb.h"
#include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.h"
#include "envoy/grpc/async_client.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/http/filter.h"
Expand Down Expand Up @@ -44,7 +45,7 @@ class GrpcClientImpl : public Client,
public Logger::Loggable<Logger::Id::ext_authz> {
public:
// TODO(gsagula): remove `use_alpha` param when V2Alpha gets deprecated.
GrpcClientImpl(Grpc::RawAsyncClientPtr&& async_client,
GrpcClientImpl(Grpc::RawAsyncClientSharedPtr async_client,
const absl::optional<std::chrono::milliseconds>& timeout,
envoy::config::core::v3::ApiVersion transport_api_version, bool use_alpha);
~GrpcClientImpl() override;
Expand Down Expand Up @@ -81,6 +82,39 @@ class GrpcClientImpl : public Client,

using GrpcClientImplPtr = std::unique_ptr<GrpcClientImpl>;

// The client cache for RawAsyncClient for Google grpc so channel is not created for each request.
// TODO(fpliu233): The cache will cause resource leak that a new channel is created every time a new
// config is pushed. Improve gRPC channel cache with better solution.
class AsyncClientCache : public Singleton::Instance {
public:
AsyncClientCache(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope,
ThreadLocal::SlotAllocator& tls)
: async_client_manager_(async_client_manager), scope_(scope), tls_slot_(tls.allocateSlot()) {
tls_slot_->set([](Event::Dispatcher&) { return std::make_shared<ThreadLocalCache>(); });
}

const Grpc::RawAsyncClientSharedPtr getOrCreateAsyncClient(
const envoy::extensions::filters::http::ext_authz::v3::ExtAuthz& proto_config);

private:
/**
* Per-thread cache.
*/
struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject {
ThreadLocalCache() = default;
// The client cache stored with key as hash of
// envoy::config::core::v3::GrpcService::GoogleGrpc config.
// TODO(fpliu233): Remove when the cleaner and generic solution for gRPC is live.
absl::flat_hash_map<std::size_t, Grpc::RawAsyncClientSharedPtr> async_clients_;
};

Grpc::AsyncClientManager& async_client_manager_;
Stats::Scope& scope_;
ThreadLocal::SlotPtr tls_slot_;
};

using AsyncClientCacheSharedPtr = std::shared_ptr<AsyncClientCache>;

} // namespace ExtAuthz
} // namespace Common
} // namespace Filters
Expand Down
31 changes: 29 additions & 2 deletions source/extensions/filters/http/ext_authz/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,23 @@ Http::FilterFactoryCb ExtAuthzFilterConfig::createFilterFactoryFromProtoTyped(
callbacks.addStreamDecoderFilter(Http::StreamDecoderFilterSharedPtr{
std::make_shared<Filter>(filter_config, std::move(client))});
};
} else if (proto_config.grpc_service().has_google_grpc()) {
// Google gRPC client.
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
auto async_client_cache = getAsyncClientCacheSingleton(context);
callback = [async_client_cache, filter_config, timeout_ms, proto_config,
transport_api_version = proto_config.transport_api_version(),
use_alpha = proto_config.hidden_envoy_deprecated_use_alpha()](
Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<Filters::Common::ExtAuthz::GrpcClientImpl>(
async_client_cache->getOrCreateAsyncClient(proto_config),
std::chrono::milliseconds(timeout_ms), transport_api_version, use_alpha);
callbacks.addStreamDecoderFilter(Http::StreamDecoderFilterSharedPtr{
std::make_shared<Filter>(filter_config, std::move(client))});
};
} else {
// gRPC client.
// Envoy gRPC client.
const uint32_t timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config.grpc_service(), timeout, DefaultTimeout);
callback = [grpc_service = proto_config.grpc_service(), &context, filter_config, timeout_ms,
Expand All @@ -61,7 +76,7 @@ Http::FilterFactoryCb ExtAuthzFilterConfig::createFilterFactoryFromProtoTyped(
}

return callback;
};
}

Router::RouteSpecificFilterConfigConstSharedPtr
ExtAuthzFilterConfig::createRouteSpecificFilterConfigTyped(
Expand All @@ -76,6 +91,18 @@ ExtAuthzFilterConfig::createRouteSpecificFilterConfigTyped(
REGISTER_FACTORY(ExtAuthzFilterConfig,
Server::Configuration::NamedHttpFilterConfigFactory){"envoy.ext_authz"};

SINGLETON_MANAGER_REGISTRATION(google_grpc_async_client_cache);

Filters::Common::ExtAuthz::AsyncClientCacheSharedPtr
getAsyncClientCacheSingleton(Server::Configuration::FactoryContext& context) {
return context.singletonManager().getTyped<Filters::Common::ExtAuthz::AsyncClientCache>(
SINGLETON_MANAGER_REGISTERED_NAME(google_grpc_async_client_cache), [&context] {
return std::make_shared<Filters::Common::ExtAuthz::AsyncClientCache>(
context.clusterManager().grpcAsyncClientManager(), context.scope(),
context.threadLocal());
});
}

} // namespace ExtAuthz
} // namespace HttpFilters
} // namespace Extensions
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/filters/http/ext_authz/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.h"
#include "envoy/extensions/filters/http/ext_authz/v3/ext_authz.pb.validate.h"

#include "extensions/filters/common/ext_authz/ext_authz_grpc_impl.h"
#include "extensions/filters/http/common/factory_base.h"
#include "extensions/filters/http/well_known_names.h"

Expand Down Expand Up @@ -33,6 +34,9 @@ class ExtAuthzFilterConfig
ProtobufMessage::ValidationVisitor& validator) override;
};

Filters::Common::ExtAuthz::AsyncClientCacheSharedPtr
getAsyncClientCacheSingleton(Server::Configuration::FactoryContext& context);

} // namespace ExtAuthz
} // namespace HttpFilters
} // namespace Extensions
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/common/ext_authz/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ envoy_cc_test(
deps = [
"//source/extensions/filters/common/ext_authz:ext_authz_grpc_lib",
"//test/extensions/filters/common/ext_authz:ext_authz_test_common",
"//test/mocks/thread_local:thread_local_mocks",
"//test/mocks/tracing:tracing_mocks",
"//test/test_common:test_runtime_lib",
"@envoy_api//envoy/service/auth/v2alpha:pkg_cc_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "test/extensions/filters/common/ext_authz/test_common.h"
#include "test/mocks/grpc/mocks.h"
#include "test/mocks/stream_info/mocks.h"
#include "test/mocks/thread_local/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/test_common/test_runtime.h"

Expand Down Expand Up @@ -373,6 +374,65 @@ TEST_P(ExtAuthzGrpcClientTest, AuthorizationOkWithDynamicMetadata) {
client_->onSuccess(std::move(check_response), span_);
}

class AsyncClientCacheTest : public testing::Test {
public:
AsyncClientCacheTest() {
client_cache_ = std::make_unique<AsyncClientCache>(async_client_manager_, scope_, tls_);
}

void expectClientCreation() {
factory_ = new Grpc::MockAsyncClientFactory;
async_client_ = new Grpc::MockAsyncClient;
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true))
.WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
EXPECT_CALL(*factory_, create()).WillOnce(Invoke([this] {
return Grpc::RawAsyncClientPtr{async_client_};
}));
return Grpc::AsyncClientFactoryPtr{factory_};
}));
}

NiceMock<ThreadLocal::MockInstance> tls_;
Grpc::MockAsyncClientManager async_client_manager_;
Grpc::MockAsyncClient* async_client_ = nullptr;
Grpc::MockAsyncClientFactory* factory_ = nullptr;
std::unique_ptr<AsyncClientCache> client_cache_;
NiceMock<Stats::MockIsolatedStatsStore> scope_;
};

TEST_F(AsyncClientCacheTest, Deduplication) {
Stats::IsolatedStoreImpl scope;
testing::InSequence s;

envoy::extensions::filters::http::ext_authz::v3::ExtAuthz config;
config.mutable_grpc_service()->mutable_google_grpc()->set_target_uri("dns://test01");
config.mutable_grpc_service()->mutable_google_grpc()->set_credentials_factory_name(
"test_credential01");

expectClientCreation();
Grpc::RawAsyncClientSharedPtr test_client_01 = client_cache_->getOrCreateAsyncClient(config);
// Fetches the existing client.
EXPECT_EQ(test_client_01, client_cache_->getOrCreateAsyncClient(config));

config.mutable_grpc_service()->mutable_google_grpc()->set_credentials_factory_name(
"test_credential02");
expectClientCreation();
// Different credentials use different clients.
EXPECT_NE(test_client_01, client_cache_->getOrCreateAsyncClient(config));
Grpc::RawAsyncClientSharedPtr test_client_02 = client_cache_->getOrCreateAsyncClient(config);

config.mutable_grpc_service()->mutable_google_grpc()->set_credentials_factory_name(
"test_credential02");
// No creation, fetching the existing one.
EXPECT_EQ(test_client_02, client_cache_->getOrCreateAsyncClient(config));

// Different targets use different clients.
config.mutable_grpc_service()->mutable_google_grpc()->set_target_uri("dns://test02");
expectClientCreation();
EXPECT_NE(test_client_01, client_cache_->getOrCreateAsyncClient(config));
EXPECT_NE(test_client_02, client_cache_->getOrCreateAsyncClient(config));
}

} // namespace ExtAuthz
} // namespace Common
} // namespace Filters
Expand Down
2 changes: 2 additions & 0 deletions test/extensions/filters/http/ext_authz/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ void expectCorrectProtoGrpc(envoy::config::core::v3::ApiVersion api_version) {
fmt::format(yaml, TestUtility::getVersionStringFromApiVersion(api_version)), *proto_config);

testing::StrictMock<Server::Configuration::MockFactoryContext> context;
EXPECT_CALL(context, singletonManager()).Times(1);
EXPECT_CALL(context, threadLocal()).Times(1);
EXPECT_CALL(context, messageValidationVisitor()).Times(1);
EXPECT_CALL(context, clusterManager()).Times(1);
EXPECT_CALL(context, runtime()).Times(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,7 @@ class ExtAuthzGrpcIntegrationTest : public Grpc::VersionedGrpcClientIntegrationP

void cleanup() {
if (fake_ext_authz_connection_ != nullptr) {
if (clientType() != Grpc::ClientType::GoogleGrpc) {
AssertionResult result = fake_ext_authz_connection_->close();
RELEASE_ASSERT(result, result.message());
}
AssertionResult result = fake_ext_authz_connection_->waitForDisconnect();
AssertionResult result = fake_ext_authz_connection_->close();
RELEASE_ASSERT(result, result.message());
}
cleanupUpstreamAndDownstream();
Expand Down Expand Up @@ -827,4 +823,67 @@ TEST_P(ExtAuthzLocalReplyIntegrationTest, DeniedHeaderTest) {
cleanup();
}

TEST_P(ExtAuthzGrpcIntegrationTest, GoogleAsyncClientCreation) {
initializeConfig();
setDownstreamProtocol(Http::CodecClient::Type::HTTP2);
HttpIntegrationTest::initialize();
initiateClientConnection(4, Headers{}, Headers{});

waitForExtAuthzRequest(expectedCheckRequest(Http::CodecClient::Type::HTTP2));
if (clientType() == Grpc::ClientType::GoogleGrpc) {
// Make sure one Google grpc client is created.
EXPECT_EQ(1, test_server_->counter("grpc.ext_authz.google_grpc_client_creation")->value());
}
sendExtAuthzResponse(Headers{}, Headers{}, Headers{}, Http::TestRequestHeaderMapImpl{},
Http::TestRequestHeaderMapImpl{});

waitForSuccessfulUpstreamResponse("200");

Http::TestRequestHeaderMapImpl headers{
{":method", "POST"}, {":path", "/test"}, {":scheme", "http"}, {":authority", "host"}};
TestUtility::feedBufferWithRandomCharacters(request_body_, 4);
response_ = codec_client_->makeRequestWithBody(headers, request_body_.toString());

auto result = fake_ext_authz_connection_->waitForNewStream(*dispatcher_, ext_authz_request_);
RELEASE_ASSERT(result, result.message());

envoy::service::auth::v3::CheckRequest check_request;
result = ext_authz_request_->waitForGrpcMessage(*dispatcher_, check_request);
RELEASE_ASSERT(result, result.message());

EXPECT_EQ("POST", ext_authz_request_->headers().getMethodValue());
EXPECT_EQ(TestUtility::getVersionedMethodPath("envoy.service.auth.{}.Authorization", "Check",
apiVersion()),
ext_authz_request_->headers().getPathValue());
EXPECT_EQ("application/grpc", ext_authz_request_->headers().getContentTypeValue());
result = ext_authz_request_->waitForEndStream(*dispatcher_);
RELEASE_ASSERT(result, result.message());

if (clientType() == Grpc::ClientType::GoogleGrpc) {
// Make sure one Google grpc client is created.
EXPECT_EQ(1, test_server_->counter("grpc.ext_authz.google_grpc_client_creation")->value());
}
sendExtAuthzResponse(Headers{}, Headers{}, Headers{}, Http::TestRequestHeaderMapImpl{},
Http::TestRequestHeaderMapImpl{});

result = fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_);
RELEASE_ASSERT(result, result.message());
result = upstream_request_->waitForEndStream(*dispatcher_);
RELEASE_ASSERT(result, result.message());

upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(response_size_, true);

response_->waitForEndStream();

EXPECT_TRUE(upstream_request_->complete());
EXPECT_EQ(request_body_.length(), upstream_request_->bodyLength());

EXPECT_TRUE(response_->complete());
EXPECT_EQ("200", response_->headers().getStatusValue());
EXPECT_EQ(response_size_, response_->body().size());

cleanup();
}

} // namespace Envoy

0 comments on commit ede6604

Please sign in to comment.