Skip to content

Commit

Permalink
[master] add RPC to reset AuthzProvider's cache
Browse files Browse the repository at this point in the history
This changelist adds an RPC end-point to control SentryAuthzProvider's
privileges cache.  As of now, it's only possible to reset the cache
of the aggregated SentryPrivilegesFetcher via the newly added
ResetAuthzCache() method.

The method has been added into the master's RPC interface.  It's
necessary to have admin/superuser credentials to successfully invoke
the method via Kudu RPC.

Also, added few tests to cover the new functionality.

Change-Id: Ib9a5c1f84172acf5751f68edfb8a9bb25df6b3f6
Reviewed-on: http://gerrit.cloudera.org:8080/13063
Tested-by: Kudu Jenkins
Reviewed-by: Hao Hao <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
  • Loading branch information
alexeyserbin committed Apr 26, 2019
1 parent bacc1f3 commit fdf2ae0
Show file tree
Hide file tree
Showing 14 changed files with 354 additions and 63 deletions.
240 changes: 232 additions & 8 deletions src/kudu/integration-tests/master_sentry-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,58 @@
// specific language governing permissions and limitations
// under the License.

#include <atomic>
#include <cstdlib>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <unordered_set>
#include <utility>
#include <vector>

#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/table_util.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/hms/hms_client.h"
#include "kudu/hms/mini_hms.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
#include "kudu/integration-tests/hms_itest-base.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
#include "kudu/master/sentry_authz_provider-test-base.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/user_credentials.h"
#include "kudu/security/test/mini_kdc.h"
#include "kudu/sentry/mini_sentry.h"
#include "kudu/sentry/sentry_client.h"
#include "kudu/sentry/sentry_policy_service_types.h"
#include "kudu/thrift/client.h"
#include "kudu/util/monotime.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

using ::sentry::TSentryGrantOption;
using ::sentry::TSentryPrivilege;
using boost::make_optional;
using boost::none;
using boost::optional;
using kudu::client::KuduClient;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduScanToken;
using kudu::client::KuduScanTokenBuilder;
using kudu::client::KuduSchema;
Expand All @@ -68,12 +76,19 @@ using kudu::client::sp::shared_ptr;
using kudu::hms::HmsClient;
using kudu::master::GetTableLocationsResponsePB;
using kudu::master::MasterServiceProxy;
using kudu::master::ResetAuthzCacheRequestPB;
using kudu::master::ResetAuthzCacheResponsePB;
using kudu::master::TabletLocationsPB;
using kudu::rpc::RpcController;
using kudu::rpc::UserCredentials;
using kudu::sentry::SentryClient;
using sentry::TSentryGrantOption;
using sentry::TSentryPrivilege;
using std::atomic;
using std::function;
using std::ostream;
using std::string;
using std::thread;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
Expand Down Expand Up @@ -180,8 +195,8 @@ class SentryITestBase : public HmsITestBase {
return client_->IsCreateTableInProgress(table, &in_progress);
}

Status DeleteTable(const string& table,
const string& /*new_table*/) {
Status DropTable(const string& table,
const string& /*new_table*/) {
return client_->DeleteTable(table);
}

Expand Down Expand Up @@ -509,9 +524,9 @@ static const AuthzDescriptor kAuthzCombinations[] = {
},
{
{
&SentryITestBase::DeleteTable,
&SentryITestBase::DropTable,
&SentryITestBase::GrantDropTablePrivilege,
"DeleteTable",
"DropTable",
},
SentryITestBase::kDatabaseName,
SentryITestBase::kTableName,
Expand Down Expand Up @@ -663,9 +678,9 @@ TEST_P(AuthzErrorHandlingTest, TestNonExistentTable) {

static const AuthzFuncs kAuthzFuncCombinations[] = {
{
&SentryITestBase::DeleteTable,
&SentryITestBase::DropTable,
&SentryITestBase::GrantDropTablePrivilege,
"DeleteTable"
"DropTable"
},
{
&SentryITestBase::AlterTable,
Expand Down Expand Up @@ -702,4 +717,213 @@ static const AuthzFuncs kAuthzFuncCombinations[] = {
INSTANTIATE_TEST_CASE_P(AuthzFuncCombinations,
AuthzErrorHandlingTest,
::testing::ValuesIn(kAuthzFuncCombinations));

// Class for test scenarios verifying functionality of managing AuthzProvider's
// privileges cache via Kudu RPC.
class SentryAuthzProviderCacheITest : public SentryITestBase {
public:
bool IsAuthzPrivilegeCacheEnabled() const override {
return true;
}

Status ResetCache() {
// ResetAuthzCache() RPC requires admin/superuser credentials, so this
// method calls Kinit(kAdminUser) to authenticate appropriately. However,
// it's necessary to return back the credentials of the regular user after
// resetting the cache since the rest of the scenario is supposed to run
// without superuser credentials.
SCOPED_CLEANUP({
WARN_NOT_OK(cluster_->kdc()->Kinit(kTestUser),
"could not restore Kerberos credentials");
});
RETURN_NOT_OK(cluster_->kdc()->Kinit(kAdminUser));
std::shared_ptr<MasterServiceProxy> proxy = cluster_->master_proxy();
UserCredentials user_credentials;
user_credentials.set_real_user(kAdminUser);
proxy->set_user_credentials(user_credentials);

RpcController ctl;
ResetAuthzCacheResponsePB res;
RETURN_NOT_OK(proxy->ResetAuthzCache(
ResetAuthzCacheRequestPB(), &res, &ctl));
return res.has_error() ? StatusFromPB(res.error().status()) : Status::OK();
}
};

// This test scenario uses AlterTable() to make sure AuthzProvider's cache
// empties upon successful ResetAuthzCache() RPC.
TEST_F(SentryAuthzProviderCacheITest, AlterTable) {
const auto table_name = Substitute("$0.$1", kDatabaseName, kTableName);
ASSERT_OK(GrantAlterTablePrivilege(kDatabaseName, kTableName));
{
unique_ptr<KuduTableAlterer> table_alterer(
client_->NewTableAlterer(table_name)->DropColumn("int8_val"));
auto s = table_alterer->Alter();
ASSERT_TRUE(s.ok()) << s.ToString();
}
ASSERT_OK(StopSentry());
{
unique_ptr<KuduTableAlterer> table_alterer(
client_->NewTableAlterer(table_name)->DropColumn("int16_val"));
auto s = table_alterer->Alter();
ASSERT_TRUE(s.ok()) << s.ToString();
}
ASSERT_OK(ResetCache());
{
// After resetting the cache, it should not be possible to perform another
// ALTER TABLE operation: no entries are in the cache, so authz provider
// needs to fetch information from Sentry directly.
unique_ptr<KuduTableAlterer> table_alterer(
client_->NewTableAlterer(table_name)->DropColumn("int32_val"));
auto s = table_alterer->Alter();
ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
}

// Try to do the same after starting Sentry back. It should be a success.
ASSERT_OK(StartSentry());
{
unique_ptr<KuduTableAlterer> table_alterer(
client_->NewTableAlterer(table_name)->DropColumn("int32_val"));
auto s = table_alterer->Alter();
ASSERT_TRUE(s.ok()) << s.ToString();
}
}

// This test scenario calls a couple of authz methods of SentryAuthzProvider
// while resetting its fetcher's cache in parallel using master's
// ResetAuthzCache() RPC.
TEST_F(SentryAuthzProviderCacheITest, ResetAuthzCacheConcurrentAlterTable) {
constexpr const auto num_threads = 16;
const auto run_interval = AllowSlowTests() ? MonoDelta::FromSeconds(8)
: MonoDelta::FromSeconds(1);
ASSERT_OK(GrantCreateTablePrivilege(kDatabaseName, ""));
for (auto idx = 0; idx < num_threads; ++idx) {
ASSERT_OK(CreateTable(Substitute("$0.$1", kDatabaseName, idx), ""));
ASSERT_OK(GrantAlterTablePrivilege(kDatabaseName, Substitute("$0", idx)));
}

vector<Status> threads_task_status(num_threads);
{
atomic<bool> stopped(false);
vector<thread> threads;

SCOPED_CLEANUP({
stopped = true;
for (auto& thread : threads) {
thread.join();
}
});

for (auto idx = 0; idx < num_threads; ++idx) {
const auto thread_idx = idx;
threads.emplace_back([&, thread_idx] () {
const auto table_name = Substitute("$0.$1", kDatabaseName, thread_idx);

while (!stopped) {
SleepFor(MonoDelta::FromMicroseconds((rand() % 2 + 1) * thread_idx));
{
unique_ptr<KuduTableAlterer> table_alterer(
client_->NewTableAlterer(table_name));
table_alterer->DropColumn("int8_val");

auto s = table_alterer->Alter();
if (!s.ok()) {
threads_task_status[thread_idx] = s;
return;
}
}

SleepFor(MonoDelta::FromMicroseconds((rand() % 3 + 1) * thread_idx));
{
unique_ptr<KuduTableAlterer> table_alterer(
client_->NewTableAlterer(table_name));
table_alterer->AddColumn("int8_val")->Type(KuduColumnSchema::INT8);
auto s = table_alterer->Alter();
if (!s.ok()) {
threads_task_status[thread_idx] = s;
return;
}
}
}
});
}

const auto time_beg = MonoTime::Now();
const auto time_end = time_beg + run_interval;
while (MonoTime::Now() < time_end) {
SleepFor(MonoDelta::FromMilliseconds((rand() % 3)));
ASSERT_OK(ResetCache());
}
}
for (auto idx = 0; idx < threads_task_status.size(); ++idx) {
SCOPED_TRACE(Substitute("results for thread $0", idx));
const auto& s = threads_task_status[idx];
EXPECT_TRUE(s.ok()) << s.ToString();
}
}

// Basic test to verify access control and functionality of
// the ResetAuthzCache(); integration with Sentry is not enabled.
class AuthzCacheControlTest : public ExternalMiniClusterITestBase {
public:
void SetUp() override {
ExternalMiniClusterITestBase::SetUp();
cluster::ExternalMiniClusterOptions opts;
opts.enable_kerberos = true;
StartClusterWithOpts(opts);
}

// Utility method to call master's ResetAuthzCache RPC under the credentials
// of the specified 'user'. The credentials should have been set appropriately
// before calling this method (e.g., call kinit if necessary).
Status ResetCache(const string& user,
RpcController* ctl,
ResetAuthzCacheResponsePB* resp) {
RETURN_NOT_OK(cluster_->kdc()->Kinit(user));
std::shared_ptr<MasterServiceProxy> proxy = cluster_->master_proxy();
UserCredentials user_credentials;
user_credentials.set_real_user(user);
proxy->set_user_credentials(user_credentials);

ResetAuthzCacheRequestPB req;
return proxy->ResetAuthzCache(req, resp, ctl);
}
};

TEST_F(AuthzCacheControlTest, ResetCacheNoSentryIntegration) {
// Non-admin users (i.e. those not in --superuser_acl) are not allowed
// to reset authz cache.
for (const auto& user : { "test-user", "joe-interloper", }) {
ASSERT_OK(cluster_->kdc()->Kinit(user));
ResetAuthzCacheResponsePB resp;
RpcController ctl;
auto s = ResetCache(user, &ctl, &resp);
ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
ASSERT_FALSE(resp.has_error());

const auto* err_status = ctl.error_response();
ASSERT_NE(nullptr, err_status);
ASSERT_TRUE(err_status->has_code());
ASSERT_EQ(rpc::ErrorStatusPB::ERROR_APPLICATION, err_status->code());
ASSERT_STR_CONTAINS(err_status->message(),
"unauthorized access to method: ResetAuthzCache");
}

// The cache can be reset with credentials of a super-user. However, in
// case if integration with Sentry is not enabled, the AuthzProvider
// doesn't have any cache.
{
ResetAuthzCacheResponsePB resp;
RpcController ctl;
const auto s = ResetCache("test-admin", &ctl, &resp);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(nullptr, ctl.error_response());
ASSERT_TRUE(resp.has_error()) << resp.error().DebugString();
const auto app_s = StatusFromPB(resp.error().status());
ASSERT_TRUE(app_s.IsNotSupported()) << app_s.ToString();
ASSERT_STR_CONTAINS(app_s.ToString(),
"provider does not have privileges cache");
}
}

} // namespace kudu
5 changes: 5 additions & 0 deletions src/kudu/master/authz_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class AuthzProvider {
// Stops the AuthzProvider instance.
virtual void Stop() = 0;

// Reset the underlying cache (if any), invalidating all cached entries.
// Returns Status::NotSupported() if the provider doesn't support resetting
// its cache.
virtual Status ResetCache() = 0;

// Checks if the table creation is authorized for the given user.
// If the table is being created with a different owner than the user,
// then more strict privilege is required.
Expand Down
4 changes: 4 additions & 0 deletions src/kudu/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,10 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
return hms_catalog_.get();
}

master::AuthzProvider* authz_provider() const {
return authz_provider_.get();
}

// Returns the normalized form of the provided table name.
//
// If the HMS integration is configured and the table name is a valid HMS
Expand Down
4 changes: 4 additions & 0 deletions src/kudu/master/default_authz_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class DefaultAuthzProvider : public AuthzProvider {

void Stop() override {}

Status ResetCache() override {
return Status::NotSupported("provider does not have privileges cache");
}

Status AuthorizeCreateTable(const std::string& /*table_name*/,
const std::string& /*user*/,
const std::string& /*owner*/) override WARN_UNUSED_RESULT {
Expand Down
Loading

0 comments on commit fdf2ae0

Please sign in to comment.