Skip to content

Commit

Permalink
Balance (vesoft-inc#3394)
Browse files Browse the repository at this point in the history
* new balancer adopted for zone rules

* alter space add zone

* make balanceTask bucket size be equal to distinct part number

* refactor zone balance

* fix balance parser

* fix rebase conflict

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
liwenhui-soul and Sophie-Xie authored Dec 29, 2021
1 parent 22ff94a commit 62d8787
Show file tree
Hide file tree
Showing 56 changed files with 3,095 additions and 2,705 deletions.
1 change: 1 addition & 0 deletions .linters/cpp/checkKeyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
'KW_IGNORE_EXISTED_INDEX',
'KW_GEOGRAPHY',
'KW_DURATION',
'KW_ACROSS',
]


Expand Down
19 changes: 19 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,25 @@ folly::Future<StatusOr<std::vector<cpp2::HostItem>>> MetaClient::listHosts(cpp2:
return future;
}

folly::Future<StatusOr<bool>> MetaClient::alterSpace(const std::string& spaceName,
meta::cpp2::AlterSpaceOp op,
const std::vector<std::string>& paras) {
cpp2::AlterSpaceReq req;
req.op_ref() = op;
req.space_name_ref() = spaceName;
req.paras_ref() = paras;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_alterSpace(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

folly::Future<StatusOr<std::vector<cpp2::PartItem>>> MetaClient::listParts(
GraphSpaceID spaceId, std::vector<PartitionID> partIds) {
cpp2::ListPartsReq req;
Expand Down
4 changes: 4 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ class MetaClient {
folly::Future<StatusOr<std::vector<cpp2::HostItem>>> listHosts(
cpp2::ListHostType type = cpp2::ListHostType::ALLOC);

folly::Future<StatusOr<bool>> alterSpace(const std::string& spaceName,
meta::cpp2::AlterSpaceOp op,
const std::vector<std::string>& paras);

folly::Future<StatusOr<std::vector<cpp2::PartItem>>> listParts(GraphSpaceID spaceId,
std::vector<PartitionID> partIds);

Expand Down
3 changes: 3 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kArgument: {
return pool->add(new ArgumentExecutor(node, qctx));
}
case PlanNode::Kind::kAlterSpace: {
return pool->add(new AlterSpaceExecutor(node, qctx));
}
case PlanNode::Kind::kUnknown: {
LOG(FATAL) << "Unknown plan node kind " << static_cast<int32_t>(node->kind());
break;
Expand Down
17 changes: 17 additions & 0 deletions src/graph/executor/admin/SpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,5 +270,22 @@ folly::Future<Status> ShowCreateSpaceExecutor::execute() {
.build());
});
}

folly::Future<Status> AlterSpaceExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto *asnode = asNode<AlterSpace>(node());
return qctx()
->getMetaClient()
->alterSpace(asnode->getSpaceName(), asnode->getAlterSpaceOp(), asnode->getParas())
.via(runner())
.thenValue([this](StatusOr<bool> &&resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(ERROR) << resp.status().toString();
return std::move(resp).status();
}
return Status::OK();
});
}
} // namespace graph
} // namespace nebula
8 changes: 8 additions & 0 deletions src/graph/executor/admin/SpaceExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ class ShowCreateSpaceExecutor final : public Executor {

folly::Future<Status> execute() override;
};

class AlterSpaceExecutor final : public Executor {
public:
AlterSpaceExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("AlterSpaceExecutor", node, qctx) {}

folly::Future<Status> execute() override;
};
} // namespace graph
} // namespace nebula

Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ Value SubmitJobExecutor::convertJobTimestampToDateTime(int64_t timestamp) {

nebula::DataSet SubmitJobExecutor::buildShowResultData(
const nebula::meta::cpp2::JobDesc &jd, const std::vector<nebula::meta::cpp2::TaskDesc> &td) {
if (jd.get_cmd() == meta::cpp2::AdminCmd::DATA_BALANCE) {
if (jd.get_cmd() == meta::cpp2::AdminCmd::DATA_BALANCE ||
jd.get_cmd() == meta::cpp2::AdminCmd::ZONE_BALANCE) {
nebula::DataSet v(
{"Job Id(spaceId:partId)", "Command(src->dst)", "Status", "Start Time", "Stop Time"});
const auto &paras = jd.get_paras();
Expand Down
38 changes: 38 additions & 0 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,44 @@ class DropSpace final : public SingleDependencyNode {
bool ifExists_;
};

class AlterSpace final : public SingleDependencyNode {
public:
static AlterSpace* make(QueryContext* qctx,
PlanNode* input,
const std::string& spaceName,
meta::cpp2::AlterSpaceOp op,
const std::vector<std::string>& paras) {
return qctx->objPool()->add(new AlterSpace(qctx, input, spaceName, op, paras));
}
const std::string& getSpaceName() const {
return spaceName_;
}

meta::cpp2::AlterSpaceOp getAlterSpaceOp() const {
return op_;
}

const std::vector<std::string>& getParas() const {
return paras_;
}

private:
AlterSpace(QueryContext* qctx,
PlanNode* input,
const std::string& spaceName,
meta::cpp2::AlterSpaceOp op,
const std::vector<std::string>& paras)
: SingleDependencyNode(qctx, Kind::kAlterSpace, input),
spaceName_(spaceName),
op_(op),
paras_(paras) {}

private:
std::string spaceName_;
meta::cpp2::AlterSpaceOp op_;
std::vector<std::string> paras_;
};

class DescSpace final : public SingleDependencyNode {
public:
static DescSpace* make(QueryContext* qctx, PlanNode* input, std::string spaceName) {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "DropEdge";
case Kind::kShowSpaces:
return "ShowSpaces";
case Kind::kAlterSpace:
return "AlterSpaces";
case Kind::kShowTags:
return "ShowTags";
case Kind::kShowEdges:
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class PlanNode {
kDropSpace,
kDropTag,
kDropEdge,
kAlterSpace,

// index related
kCreateTagIndex,
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/PermissionCheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Status PermissionCheck::permissionCheck(ClientSession *session,
return Status::OK();
}
case Sentence::Kind::kCreateSpace:
case Sentence::Kind::kAlterSpace:
case Sentence::Kind::kCreateSpaceAs:
case Sentence::Kind::kDropSpace:
case Sentence::Kind::kCreateSnapshot:
Expand Down
1 change: 1 addition & 0 deletions src/graph/validator/AdminJobValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AdminJobValidator final : public Validator {
case meta::cpp2::AdminCmd::FLUSH:
case meta::cpp2::AdminCmd::DATA_BALANCE:
case meta::cpp2::AdminCmd::LEADER_BALANCE:
case meta::cpp2::AdminCmd::ZONE_BALANCE:
return true;
// TODO: Also space related, but not available in CreateJobExecutor now.
case meta::cpp2::AdminCmd::DOWNLOAD:
Expand Down
13 changes: 13 additions & 0 deletions src/graph/validator/AdminValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,19 @@ Status CreateSpaceAsValidator::toPlan() {
return Status::OK();
}

Status AlterSpaceValidator::validateImpl() {
return Status::OK();
}

Status AlterSpaceValidator::toPlan() {
auto sentence = static_cast<AlterSpaceSentence *>(sentence_);
auto *doNode = AlterSpace::make(
qctx_, nullptr, sentence->spaceName(), sentence->alterSpaceOp(), sentence->paras());
root_ = doNode;
tail_ = root_;
return Status::OK();
}

Status DescSpaceValidator::validateImpl() {
return Status::OK();
}
Expand Down
12 changes: 12 additions & 0 deletions src/graph/validator/AdminValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ class CreateSpaceAsValidator final : public Validator {
std::string newSpaceName_;
};

class AlterSpaceValidator final : public Validator {
public:
AlterSpaceValidator(Sentence* sentence, QueryContext* context) : Validator(sentence, context) {
noSpaceRequired_ = true;
}

private:
Status validateImpl() override;

Status toPlan() override;
};

class DescSpaceValidator final : public Validator {
public:
DescSpaceValidator(Sentence* sentence, QueryContext* context) : Validator(sentence, context) {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/validator/Validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ std::unique_ptr<Validator> Validator::makeValidator(Sentence* sentence, QueryCon
return std::make_unique<ShowQueriesValidator>(sentence, context);
case Sentence::Kind::kKillQuery:
return std::make_unique<KillQueryValidator>(sentence, context);
case Sentence::Kind::kAlterSpace:
return std::make_unique<AlterSpaceValidator>(sentence, context);
case Sentence::Kind::kUnknown:
case Sentence::Kind::kReturn: {
// nothing
Expand Down
12 changes: 12 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ struct ExecResp {
3: common.HostAddr leader,
}

enum AlterSpaceOp {
ADD_ZONE = 0x01,
} (cpp.enum_strict)

struct AlterSpaceReq {
1: binary space_name,
2: AlterSpaceOp op,
3: list<binary> paras,
}

// Job related data structures
enum AdminJobOp {
ADD = 0x01,
Expand All @@ -235,6 +245,7 @@ enum AdminCmd {
DOWNLOAD = 7,
INGEST = 8,
LEADER_BALANCE = 9,
ZONE_BALANCE = 10,
UNKNOWN = 99,
} (cpp.enum_strict)

Expand Down Expand Up @@ -1168,6 +1179,7 @@ service MetaService {
ExecResp dropSpace(1: DropSpaceReq req);
GetSpaceResp getSpace(1: GetSpaceReq req);
ListSpacesResp listSpaces(1: ListSpacesReq req);
ExecResp alterSpace(1: AlterSpaceReq req);

ExecResp createSpaceAs(1: CreateSpaceAsReq req);

Expand Down
6 changes: 6 additions & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ nebula_add_library(
processors/parts/ListSpacesProcessor.cpp
processors/parts/DropSpaceProcessor.cpp
processors/parts/GetPartsAllocProcessor.cpp
processors/parts/AlterSpaceProcessor.cpp
processors/schema/CreateTagProcessor.cpp
processors/schema/AlterTagProcessor.cpp
processors/schema/GetTagProcessor.cpp
Expand Down Expand Up @@ -68,11 +69,16 @@ nebula_add_library(
processors/job/AdminJobProcessor.cpp
processors/job/ReportTaskProcessor.cpp
processors/job/JobUtils.cpp
processors/job/StorageJobExecutor.cpp
processors/job/JobExecutor.cpp
processors/job/MetaJobExecutor.cpp
processors/job/SimpleConcurrentJobExecutor.cpp
processors/job/CompactJobExecutor.cpp
processors/job/FlushJobExecutor.cpp
processors/job/BalanceJobExecutor.cpp
processors/job/ZoneBalanceJobExecutor.cpp
processors/job/DataBalanceJobExecutor.cpp
processors/job/LeaderBalanceJobExecutor.cpp
processors/job/RebuildJobExecutor.cpp
processors/job/RebuildTagJobExecutor.cpp
processors/job/RebuildEdgeJobExecutor.cpp
Expand Down
7 changes: 7 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "meta/processors/kv/RemoveRangeProcessor.h"
#include "meta/processors/kv/ScanProcessor.h"
#include "meta/processors/listener/ListenerProcessor.h"
#include "meta/processors/parts/AlterSpaceProcessor.h"
#include "meta/processors/parts/CreateSpaceAsProcessor.h"
#include "meta/processors/parts/CreateSpaceProcessor.h"
#include "meta/processors/parts/DropSpaceProcessor.h"
Expand Down Expand Up @@ -86,6 +87,12 @@ folly::Future<cpp2::ExecResp> MetaServiceHandler::future_createSpace(
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp> MetaServiceHandler::future_alterSpace(
const cpp2::AlterSpaceReq& req) {
auto* processor = AlterSpaceProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp> MetaServiceHandler::future_createSpaceAs(
const cpp2::CreateSpaceAsReq& req) {
auto* processor = CreateSpaceAsProcessor::instance(kvstore_);
Expand Down
2 changes: 2 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
* */
folly::Future<cpp2::ExecResp> future_createSpace(const cpp2::CreateSpaceReq& req) override;

folly::Future<cpp2::ExecResp> future_alterSpace(const cpp2::AlterSpaceReq& req) override;

folly::Future<cpp2::ExecResp> future_createSpaceAs(const cpp2::CreateSpaceAsReq& req) override;

folly::Future<cpp2::ExecResp> future_dropSpace(const cpp2::DropSpaceReq& req) override;
Expand Down
Loading

0 comments on commit 62d8787

Please sign in to comment.