Skip to content

Commit

Permalink
Merge pull request facebook#116 from anirbanr-fb/persistent_vars
Browse files Browse the repository at this point in the history
Add support for persisting variables
  • Loading branch information
yashtc authored Jun 30, 2021
2 parents 7880782 + cc53c10 commit 6ab77c9
Show file tree
Hide file tree
Showing 13 changed files with 525 additions and 3 deletions.
23 changes: 22 additions & 1 deletion src/kudu/consensus/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ ADD_EXPORTABLE_LIBRARY(consensus_proto
DEPS ${CONSENSUS_KRPC_LIBS}
NONLINK_DEPS ${CONSENSUS_KRPC_TGTS})

#########################################
# persistent_vars_proto
#########################################

PROTOBUF_GENERATE_CPP(
PERSISTENT_VARS_PROTO_SRCS PERSISTENT_VARS_PROTO_HDRS PERSISTENT_VARS_PROTO_TGTS
SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
PROTO_FILES
persistent_vars.proto)
set(PERSISTENT_VARS_PROTO_LIBS
kudu_common_proto
protobuf)
ADD_EXPORTABLE_LIBRARY(persistent_vars_proto
SRCS ${PERSISTENT_VARS_PROTO_SRCS}
DEPS ${PERSISTENT_VARS_PROTO_LIBS}
NONLINK_DEPS ${PERSISTENT_VARS_PROTO_TGTS})

#########################################
# log_proto
#########################################
Expand Down Expand Up @@ -104,7 +122,8 @@ target_link_libraries(log
kudu_util_compression
consensus_proto
log_proto
consensus_metadata_proto)
consensus_metadata_proto
persistent_vars_proto)

set(CONSENSUS_SRCS
consensus_meta.cc
Expand All @@ -114,6 +133,8 @@ set(CONSENSUS_SRCS
leader_election.cc
log_cache.cc
peer_manager.cc
persistent_vars.cc
persistent_vars_manager.cc
pending_rounds.cc
quorum_util.cc
raft_consensus.cc
Expand Down
116 changes: 116 additions & 0 deletions src/kudu/consensus/persistent_vars.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/persistent_vars.h"

#include <gflags/gflags.h>
#include <glog/logging.h>

#include "kudu/consensus/persistent_vars.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/logging.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"

namespace kudu {
namespace consensus {

using std::lock_guard;
using std::string;
using strings::Substitute;

bool PersistentVars::is_start_election_allowed() const {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
DCHECK(pb_.has_allow_start_election());
return pb_.allow_start_election();
}

void PersistentVars::set_allow_start_election(bool val) {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
pb_.set_allow_start_election(val);
}

Status PersistentVars::Flush(FlushMode flush_mode) {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500, LogPrefix(), "flushing persistent variables");

// Create directories if needed.
string dir = fs_manager_->GetConsensusMetadataDir();
bool created_dir = false;
RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(
fs_manager_->env(), dir, &created_dir),
"Unable to create consensus metadata root dir");
// fsync() parent dir if we had to create the dir.
if (PREDICT_FALSE(created_dir)) {
string parent_dir = DirName(dir);
RETURN_NOT_OK_PREPEND(Env::Default()->SyncDir(parent_dir),
"Unable to fsync consensus parent dir " + parent_dir);
}

string persistent_vars_file_path = fs_manager_->GetPersistentVarsPath(tablet_id_);
RETURN_NOT_OK_PREPEND(pb_util::WritePBContainerToPath(
fs_manager_->env(), persistent_vars_file_path, pb_,
flush_mode == OVERWRITE ? pb_util::OVERWRITE : pb_util::NO_OVERWRITE,
pb_util::SYNC),
Substitute("Unable to write persistent vars file for tablet $0 to path $1",
tablet_id_, persistent_vars_file_path));
return Status::OK();
}

PersistentVars::PersistentVars(FsManager* fs_manager,
std::string tablet_id,
std::string peer_uuid)
: fs_manager_(CHECK_NOTNULL(fs_manager)),
tablet_id_(std::move(tablet_id)),
peer_uuid_(std::move(peer_uuid)) {}

Status PersistentVars::Create(FsManager* fs_manager,
const string& tablet_id,
const std::string& peer_uuid,
scoped_refptr<PersistentVars>* persistent_vars_out) {

scoped_refptr<PersistentVars> persistent_vars(new PersistentVars(fs_manager, tablet_id, peer_uuid));

RETURN_NOT_OK(persistent_vars->Flush(NO_OVERWRITE)); // Create() should not clobber.

if (persistent_vars_out) *persistent_vars_out = std::move(persistent_vars);
return Status::OK();
}

Status PersistentVars::Load(FsManager* fs_manager,
const std::string& tablet_id,
const std::string& peer_uuid,
scoped_refptr<PersistentVars>* persistent_vars_out) {
scoped_refptr<PersistentVars> persistent_vars(new PersistentVars(fs_manager, tablet_id, peer_uuid));
RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(fs_manager->env(),
fs_manager->GetPersistentVarsPath(tablet_id),
&persistent_vars->pb_));
if (persistent_vars_out) *persistent_vars_out = std::move(persistent_vars);
return Status::OK();
}

std::string PersistentVars::LogPrefix() const {
// No need to lock to read const members.
return Substitute("T $0 P $1: ", tablet_id_, peer_uuid_);
}

} // namespace consensus
} // namespace kudu
100 changes: 100 additions & 0 deletions src/kudu/consensus/persistent_vars.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <atomic>
#include <cstdint>
#include <deque>
#include <string>

#include "kudu/consensus/persistent_vars.pb.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/threading/thread_collision_warner.h"

namespace kudu {

class FsManager;
class Status;

namespace consensus {

class PersistentVarsManager; // IWYU pragma: keep
class PersistentVarsTest; // IWYU pragma: keep

// Provides methods to read and write persistent variables.
// This class is not thread-safe and requires external synchronization.
class PersistentVars : public RefCountedThreadSafe<PersistentVars> {
public:

// Specify whether we are allowed to overwrite an existing file when flushing.
enum FlushMode {
OVERWRITE,
NO_OVERWRITE
};

// Accessor for whether starting elections is allowed
bool is_start_election_allowed() const;

// Allow/Disallow starting elections
void set_allow_start_election(bool val);

// Persist current state of the protobuf to disk.
Status Flush(FlushMode flush_mode = OVERWRITE);

private:
friend class RefCountedThreadSafe<PersistentVars>;
friend class PersistentVarsManager;

PersistentVars(FsManager* fs_manager,
std::string tablet_id,
std::string peer_uuid);

// Create a PersistentVars object; the encoded PB is flushed to disk before
// returning
static Status Create(FsManager* fs_manager,
const std::string& tablet_id,
const std::string& peer_uuid,
scoped_refptr<PersistentVars>* persistent_vars_out = nullptr);

// Load a PersistentVars object from disk.
// Returns Status::NotFound if the file could not be found. May return other
// Status codes if unable to read the file.
static Status Load(FsManager* fs_manager,
const std::string& tablet_id,
const std::string& peer_uuid,
scoped_refptr<PersistentVars>* persistent_vars_out = nullptr);

std::string LogPrefix() const;

FsManager* const fs_manager_;
const std::string tablet_id_;
const std::string peer_uuid_;

// This fake mutex helps ensure that this PersistentVars object stays
// externally synchronized.
DFAKE_MUTEX(fake_lock_);

// Durable fields.
PersistentVarsPB pb_;

DISALLOW_COPY_AND_ASSIGN(PersistentVars);
};

} // namespace consensus
} // namespace kudu
29 changes: 29 additions & 0 deletions src/kudu/consensus/persistent_vars.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

syntax = "proto2";
package kudu.consensus;

option java_package = "org.apache.kudu.consensus";

// This PB is used to serialize all of the persistent variables to disk whenever
// they are updated. These will be read during Init()
message PersistentVarsPB {
// Flag to allow starting elections on the peer. If disabled, the peer will
// not start elections - even if there are heartbeat failures from the leader
optional bool allow_start_election = 1 [ default = true ];
}
91 changes: 91 additions & 0 deletions src/kudu/consensus/persistent_vars_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/persistent_vars_manager.h"

#include <memory>
#include <mutex>
#include <utility>

#include <glog/logging.h>

#include "kudu/consensus/persistent_vars.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/status.h"

namespace kudu {
namespace consensus {

using std::lock_guard;
using std::shared_ptr;
using std::string;
using strings::Substitute;

PersistentVarsManager::PersistentVarsManager(FsManager* fs_manager)
: fs_manager_(DCHECK_NOTNULL(fs_manager)) {
}

Status PersistentVarsManager::CreatePersistentVars(const string& tablet_id,
scoped_refptr<PersistentVars>* persistent_vars_out) {
scoped_refptr<PersistentVars> persistent_vars;
RETURN_NOT_OK_PREPEND(PersistentVars::Create(fs_manager_, tablet_id, fs_manager_->uuid(),
&persistent_vars),
Substitute("Unable to create consensus metadata for tablet $0", tablet_id));

lock_guard<Mutex> l(persistent_vars_lock_);
if (!InsertIfNotPresent(&persistent_vars_cache_, tablet_id, persistent_vars)) {
return Status::AlreadyPresent(Substitute("PersistentVars instance for $0 already exists",
tablet_id));
}
if (persistent_vars_out) *persistent_vars_out = std::move(persistent_vars);
return Status::OK();
}

Status PersistentVarsManager::LoadPersistentVars(const string& tablet_id,
scoped_refptr<PersistentVars>* persistent_vars_out) {
{
lock_guard<Mutex> l(persistent_vars_lock_);

// Try to get the persistent_vars instance from cache first.
scoped_refptr<PersistentVars>* cached_persistent_vars = FindOrNull(persistent_vars_cache_, tablet_id);
if (cached_persistent_vars) {
if (persistent_vars_out) *persistent_vars_out = *cached_persistent_vars;
return Status::OK();
}
}

// If it's not yet cached, drop the lock before we load it.
scoped_refptr<PersistentVars> persistent_vars;
RETURN_NOT_OK_PREPEND(PersistentVars::Load(fs_manager_, tablet_id, fs_manager_->uuid(),
&persistent_vars),
Substitute("Unable to load persistent vars for tablet $0", tablet_id));

// Cache and return the loaded PersistentVars.
{
lock_guard<Mutex> l(persistent_vars_lock_);
// Due to our thread-safety contract, no other caller may have interleaved
// with us for this tablet id, so we use InsertOrDie().
InsertOrDie(&persistent_vars_cache_, tablet_id, persistent_vars);
}

if (persistent_vars_out) *persistent_vars_out = std::move(persistent_vars);
return Status::OK();
}

} // namespace consensus
} // namespace kudu
Loading

0 comments on commit 6ab77c9

Please sign in to comment.