Skip to content

Commit

Permalink
sync after space/schema changes (vesoft-inc#1690)
Browse files Browse the repository at this point in the history
Add a sync in LastUpdateTimeMan::update

PS: In 3 replicas env, sync only make sure that at least one of the follower has committed. This PR could help in most cases. If we want to totally solve it out, either client only send request to leader of meta server, or write through in meta server, which need be considered more carefully later.
  • Loading branch information
critical27 authored and dutor committed Jan 21, 2020
1 parent 19c5f85 commit 346e640
Show file tree
Hide file tree
Showing 22 changed files with 108 additions and 32 deletions.
3 changes: 3 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class KVStore {
std::string&& prefix,
std::unique_ptr<KVIterator>* iter) = delete;

virtual ResultCode sync(GraphSpaceID spaceId,
PartitionID partId) = 0;

virtual void asyncMultiPut(GraphSpaceID spaceId,
PartitionID partId,
std::vector<KV> keyValues,
Expand Down
21 changes: 21 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,27 @@ ResultCode NebulaStore::rangeWithPrefix(GraphSpaceID spaceId,
}


ResultCode NebulaStore::sync(GraphSpaceID spaceId,
PartitionID partId) {
auto partRet = part(spaceId, partId);
if (!ok(partRet)) {
return error(partRet);
}
auto part = nebula::value(partRet);
if (!checkLeader(part)) {
return ResultCode::ERR_LEADER_CHANGED;
}
auto ret = ResultCode::SUCCEEDED;
folly::Baton<true, std::atomic> baton;
part->sync([&] (kvstore::ResultCode code) {
ret = code;
baton.post();
});
baton.wait();
return ret;
}


void NebulaStore::asyncMultiPut(GraphSpaceID spaceId,
PartitionID partId,
std::vector<KV> keyValues,
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ class NebulaStore : public KVStore, public Handler {
std::string&& prefix,
std::unique_ptr<KVIterator>* iter) override = delete;

ResultCode sync(GraphSpaceID spaceId,
PartitionID partId) override;

// async batch put.
void asyncMultiPut(GraphSpaceID spaceId,
PartitionID partId,
Expand Down
6 changes: 6 additions & 0 deletions src/kvstore/plugins/hbase/HBaseStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ ResultCode HBaseStore::rangeWithPrefix(GraphSpaceID spaceId,
}


ResultCode HBaseStore::sync(GraphSpaceID spaceId, PartitionID partId) {
UNUSED(spaceId);
UNUSED(partId);
LOG(FATAL) << "Unimplement";
}

ResultCode HBaseStore::multiRemove(GraphSpaceID spaceId,
std::vector<std::string>& keys) {
auto tableName = this->spaceIdToTableName(spaceId);
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/plugins/hbase/HBaseStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class HBaseStore : public KVStore {
const std::string& prefix,
std::unique_ptr<KVIterator>* iter) override;

ResultCode sync(GraphSpaceID spaceId, PartitionID partId) override;

// async batch put.
void asyncMultiPut(GraphSpaceID spaceId,
PartitionID partId,
Expand Down
8 changes: 4 additions & 4 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ kvstore::ResultCode LastUpdateTimeMan::update(kvstore::KVStore* kv, const int64_
kvstore::ResultCode ret;
kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(data),
[&] (kvstore::ResultCode code) {
ret = code;
baton.post();
});
ret = code;
baton.post();
});
baton.wait();
return ret;
return kv->sync(kDefaultSpaceId, kDefaultPartId);
}

int64_t LastUpdateTimeMan::get(kvstore::KVStore* kv) {
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ class BaseProcessor {

bool doSyncPut(std::vector<kvstore::KV> data);

void doSyncPutAndUpdate(std::vector<kvstore::KV> data);

void doSyncMultiRemoveAndUpdate(std::vector<std::string> keys);

protected:
kvstore::KVStore* kvstore_ = nullptr;
RESP resp_;
Expand Down
50 changes: 50 additions & 0 deletions src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -382,5 +382,55 @@ bool BaseProcessor<RESP>::doSyncPut(std::vector<kvstore::KV> data) {
return ret;
}

template<typename RESP>
void BaseProcessor<RESP>::doSyncPutAndUpdate(std::vector<kvstore::KV> data) {
folly::Baton<true, std::atomic> baton;
auto ret = kvstore::ResultCode::SUCCEEDED;
kvstore_->asyncMultiPut(kDefaultSpaceId,
kDefaultPartId,
std::move(data),
[&ret, &baton] (kvstore::ResultCode code) {
if (kvstore::ResultCode::SUCCEEDED != code) {
ret = code;
LOG(INFO) << "Put data error on meta server";
}
baton.post();
});
baton.wait();
if (ret != kvstore::ResultCode::SUCCEEDED) {
this->resp_.set_code(to(ret));
this->onFinished();
return;
}
ret = LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
this->resp_.set_code(to(ret));
this->onFinished();
}

template<typename RESP>
void BaseProcessor<RESP>::doSyncMultiRemoveAndUpdate(std::vector<std::string> keys) {
folly::Baton<true, std::atomic> baton;
auto ret = kvstore::ResultCode::SUCCEEDED;
kvstore_->asyncMultiRemove(kDefaultSpaceId,
kDefaultPartId,
std::move(keys),
[&ret, &baton] (kvstore::ResultCode code) {
if (kvstore::ResultCode::SUCCEEDED != code) {
ret = code;
LOG(INFO) << "Remove data error on meta server";
}
baton.post();
});
baton.wait();
if (ret != kvstore::ResultCode::SUCCEEDED) {
this->resp_.set_code(to(ret));
this->onFinished();
return;
}
ret = LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
this->resp_.set_code(to(ret));
this->onFinished();
}

} // namespace meta
} // namespace nebula
3 changes: 1 addition & 2 deletions src/meta/processors/configMan/RegConfigProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ void RegConfigProcessor::process(const cpp2::RegConfigReq& req) {
}

if (!data.empty()) {
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
doPut(std::move(data));
doSyncPutAndUpdate(std::move(data));
return;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/configMan/SetConfigProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ void SetConfigProcessor::process(const cpp2::SetConfigReq& req) {
}

if (!data.empty()) {
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
doPut(std::move(data));
doSyncPutAndUpdate(std::move(data));
return;
}
return;
} while (false);
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
std::string(reinterpret_cast<const char*>(&edgeIndex), sizeof(IndexID)));
data.emplace_back(MetaServiceUtils::indexKey(space, edgeIndex),
MetaServiceUtils::indexVal(item));
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
LOG(INFO) << "Create Edge Index " << indexName << ", edgeIndex " << edgeIndex;
resp_.set_id(to(edgeIndex, EntryType::INDEX));
doPut(std::move(data));
doSyncPutAndUpdate(std::move(data));
}

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/indexMan/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,9 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
std::string(reinterpret_cast<const char*>(&tagIndex), sizeof(IndexID)));
data.emplace_back(MetaServiceUtils::indexKey(space, tagIndex),
MetaServiceUtils::indexVal(item));
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
LOG(INFO) << "Create Tag Index " << indexName << ", tagIndex " << tagIndex;
resp_.set_id(to(tagIndex, EntryType::INDEX));
doPut(std::move(data));
doSyncPutAndUpdate(std::move(data));
}

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/indexMan/DropEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ void DropEdgeIndexProcessor::process(const cpp2::DropEdgeIndexReq& req) {
keys.emplace_back(MetaServiceUtils::indexIndexKey(spaceID, indexName));
keys.emplace_back(MetaServiceUtils::indexKey(spaceID, edgeIndexID.value()));

LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
LOG(INFO) << "Drop Edge Index " << indexName;
resp_.set_id(to(edgeIndexID.value(), EntryType::INDEX));
doMultiRemove(keys);
doSyncMultiRemoveAndUpdate(std::move(keys));
}

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/indexMan/DropTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ void DropTagIndexProcessor::process(const cpp2::DropTagIndexReq& req) {
keys.emplace_back(MetaServiceUtils::indexIndexKey(spaceID, indexName));
keys.emplace_back(MetaServiceUtils::indexKey(spaceID, tagIndexID.value()));

LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
LOG(INFO) << "Drop Tag Index " << indexName;
resp_.set_id(to(tagIndexID.value(), EntryType::INDEX));
doMultiRemove(keys);
doSyncMultiRemoveAndUpdate(std::move(keys));
}

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/partsMan/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
}
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
resp_.set_id(to(spaceId, EntryType::SPACE));
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
doPut(std::move(data));
doSyncPutAndUpdate(std::move(data));
}


Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/partsMan/DropSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) {
// delete related role data.
// TODO(boshengchen) delete related role data under the space
// TODO(YT) delete Tag/Edge under the space
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
doMultiRemove(std::move(deleteKeys));
// TODO(YT) delete part files of the space
doSyncMultiRemoveAndUpdate(std::move(deleteKeys));
}

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/schemaMan/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
data.emplace_back(MetaServiceUtils::schemaEdgeKey(req.get_space_id(), edgeType, version),
MetaServiceUtils::schemaEdgeVal(req.get_edge_name(), schema));
resp_.set_id(to(edgeType, EntryType::EDGE));
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
doPut(std::move(data));
doSyncPutAndUpdate(std::move(data));
}

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/schemaMan/AlterTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
data.emplace_back(MetaServiceUtils::schemaTagKey(req.get_space_id(), tagId, version),
MetaServiceUtils::schemaTagVal(req.get_tag_name(), schema));
resp_.set_id(to(tagId, EntryType::TAG));
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
doPut(std::move(data));
doSyncPutAndUpdate(std::move(data));
}

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/schemaMan/CreateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) {
LOG(INFO) << "Create Edge " << edgeName << ", edgeType " << edgeType;
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
resp_.set_id(to(edgeType, EntryType::EDGE));
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
doPut(std::move(data));
doSyncPutAndUpdate(std::move(data));
}

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/schemaMan/CreateTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) {
LOG(INFO) << "Create Tag " << tagName << ", TagID " << tagId;
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
resp_.set_id(to(tagId, EntryType::TAG));
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
doPut(std::move(data));
doSyncPutAndUpdate(std::move(data));
}

} // namespace meta
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/schemaMan/DropEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {
return;
}
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
LOG(INFO) << "Drop Edge " << req.get_edge_name();
doMultiRemove(std::move(ret.value()));
doSyncMultiRemoveAndUpdate(std::move(ret.value()));
}

StatusOr<std::vector<std::string>> DropEdgeProcessor::getEdgeKeys(GraphSpaceID id,
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/schemaMan/DropTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {
return;
}
resp_.set_code(cpp2::ErrorCode::SUCCEEDED);
LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec());
LOG(INFO) << "Drop Tag " << req.get_tag_name();
doMultiRemove(std::move(ret.value()));
doSyncMultiRemoveAndUpdate(std::move(ret.value()));
}

StatusOr<std::vector<std::string>> DropTagProcessor::getTagKeys(GraphSpaceID id,
Expand Down

0 comments on commit 346e640

Please sign in to comment.