Skip to content

Commit

Permalink
To support delete in toss (vesoft-inc#3374)
Browse files Browse the repository at this point in the history
* To support delte in TOSS

dummy copy from chain add edges processor

reguraly update

some update

may compile

add some simple UT

adjusting UT

add explain DeleteEdgesRequest

add some UT

clear reverse table early

expose storage interface

update license & fix gcc compile error

fix compile error

update license

fix chain interface bug

fix mem-leak

rename some processor

continue rename Processor

add some log for debug

print readable edge

debug keys when delete

print conflict key and scanned key

print rank

delPrime delete the wrong key from memory lock

fix raft may rollback want we scanned

* rebase master

* fix format

* fix compile error

* fix lint

* address comments

* fix lint

* looks like some test may fail due to not stop kvstore

Co-authored-by: Doodle <[email protected]>
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
3 people authored Dec 29, 2021
1 parent 3e71921 commit 8c2e50f
Show file tree
Hide file tree
Showing 59 changed files with 2,150 additions and 610 deletions.
45 changes: 44 additions & 1 deletion src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
auto partId = directReq.get_parts().begin()->first;
auto optLeader = getLeader(directReq.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId);
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
Expand Down Expand Up @@ -131,5 +132,47 @@ cpp2::ChainAddEdgesRequest InternalStorageClient::makeChainAddReq(const cpp2::Ad
return ret;
}

void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
const std::string& txnId,
TermID termId,
folly::Promise<nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb) {
auto spaceId = req.get_space_id();
auto partId = req.get_parts().begin()->first;
auto optLeader = getLeader(req.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(2) << "leader host: " << leader;

cpp2::ChainDeleteEdgesRequest chainReq;
chainReq.space_id_ref() = req.get_space_id();
chainReq.parts_ref() = req.get_parts();
chainReq.txn_id_ref() = txnId;
chainReq.term_ref() = termId;
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
return client->future_chainDeleteEdges(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainDeleteEdges(req, txnId, termId, std::move(p));
} else {
p.setValue(code);
}
return;
});
}

} // namespace storage
} // namespace nebula
6 changes: 6 additions & 0 deletions src/clients/storage/InternalStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ class InternalStorageClient
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

virtual void chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
const std::string& txnId,
TermID termId,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

private:
cpp2::ChainAddEdgesRequest makeChainAddReq(const cpp2::AddEdgesRequest& req,
TermID termId,
Expand Down
6 changes: 4 additions & 2 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteEdges(

return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return client->future_deleteEdges(r);
[useToss = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return useToss ? client->future_chainDeleteEdges(r)
: client->future_deleteEdges(r);
});
}

Expand Down
2 changes: 2 additions & 0 deletions src/common/utils/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ enum class NebulaKeyType : uint32_t {
kOperation = 0x00000005,
kKeyValue = 0x00000006,
kVertex = 0x00000007,
kPrime = 0x00000008, // used in TOSS, if we write a lock succeed
kDoublePrime = 0x00000009, // used in TOSS, if we get RPC back from remote.
};

enum class NebulaSystemKeyType : uint32_t {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "graph/context/QueryContext.h"
#include "graph/executor/mutate/DeleteExecutor.h"
#include "graph/planner/plan/Mutate.h"
#include "graph/service/GraphFlags.h"
#include "graph/util/SchemaUtil.h"

using nebula::storage::StorageClient;
Expand Down Expand Up @@ -208,6 +209,7 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
return qctx()
->getStorageClient()
->deleteEdges(param, std::move(edgeKeys))
Expand Down
23 changes: 11 additions & 12 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ service GraphStorageService {

UpdateResponse chainUpdateEdge(1: UpdateEdgeRequest req);
ExecResponse chainAddEdges(1: AddEdgesRequest req);
ExecResponse chainDeleteEdges(1: DeleteEdgesRequest req);

KVGetResponse get(1: KVGetRequest req);
ExecResponse put(1: KVPutRequest req);
Expand Down Expand Up @@ -854,17 +855,6 @@ service StorageAdminService {
//
//////////////////////////////////////////////////////////

// transaction request
struct InternalTxnRequest {
1: i64 txn_id,
2: map<common.PartitionID, i64> term_of_parts,
3: optional AddEdgesRequest add_edge_req,
4: optional UpdateEdgeRequest upd_edge_req,
5: optional map<common.PartitionID, list<i64>>(
cpp.template = "std::unordered_map") edge_ver,
}


struct ChainAddEdgesRequest {
1: common.GraphSpaceID space_id,
// partId => edges
Expand All @@ -875,7 +865,6 @@ struct ChainAddEdgesRequest {
3: list<binary> prop_names,
// if true, when edge already exists, do nothing
4: bool if_not_exists,
// 5: map<common.PartitionID, i64> term_of_parts,
5: i64 term
6: optional i64 edge_version
// 6: optional map<common.PartitionID, list<i64>>(
Expand All @@ -891,7 +880,17 @@ struct ChainUpdateEdgeRequest {
5: required list<common.PartitionID> parts,
}

struct ChainDeleteEdgesRequest {
1: common.GraphSpaceID space_id,
// partId => edgeKeys
2: map<common.PartitionID, list<EdgeKey>>
(cpp.template = "std::unordered_map") parts,
3: binary txn_id
4: i64 term,
}

service InternalStorageService {
ExecResponse chainAddEdges(1: ChainAddEdgesRequest req);
UpdateResponse chainUpdateEdge(1: ChainUpdateEdgeRequest req);
ExecResponse chainDeleteEdges(1: ChainDeleteEdgesRequest req);
}
4 changes: 2 additions & 2 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
// Make the number of values are an even number
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
VLOG(2) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
<< ", val = " << folly::hexlify(kvs[i + 1]);
auto code = batch->put(kvs[i], kvs[i + 1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down Expand Up @@ -295,7 +295,7 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
case OP_BATCH_WRITE: {
auto data = decodeBatchValue(log);
for (auto& op : data) {
VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
VLOG(2) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
<< ", val=" << folly::hexlify(op.second.second);
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
if (op.first == BatchLogType::OP_BATCH_PUT) {
Expand Down
1 change: 0 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,6 @@ void RaftPart::checkRemoteListeners(const std::set<HostAddr>& expected) {
}
}
}

bool RaftPart::leaseValid() {
std::lock_guard<std::mutex> g(raftLock_);
if (hosts_.empty()) {
Expand Down
12 changes: 8 additions & 4 deletions src/mock/MockData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ std::vector<VertexID> MockData::mockPlayerVerticeIds() {
return ret;
}

std::vector<EdgeData> MockData::mockEdges(bool upper) {
std::vector<EdgeData> MockData::mockEdges(bool upper, bool hasInEdges) {
std::vector<EdgeData> ret;
// Use serve data, positive edgeType is 101, reverse edgeType is -101
for (auto& serve : serves_) {
Expand Down Expand Up @@ -788,7 +788,9 @@ std::vector<EdgeData> MockData::mockEdges(bool upper) {
positiveEdge.props_ = std::move(props);
auto reverseData = getReverseEdge(positiveEdge);
ret.emplace_back(std::move(positiveEdge));
ret.emplace_back(std::move(reverseData));
if (hasInEdges) {
ret.emplace_back(std::move(reverseData));
}
}
return ret;
}
Expand Down Expand Up @@ -947,11 +949,13 @@ nebula::storage::cpp2::DeleteVerticesRequest MockData::mockDeleteVerticesReq(int
return req;
}

nebula::storage::cpp2::AddEdgesRequest MockData::mockAddEdgesReq(bool upper, int32_t parts) {
nebula::storage::cpp2::AddEdgesRequest MockData::mockAddEdgesReq(bool upper,
int32_t parts,
bool hasInEdges) {
nebula::storage::cpp2::AddEdgesRequest req;
req.space_id_ref() = 1;
req.if_not_exists_ref() = true;
auto retRecs = mockEdges(upper);
auto retRecs = mockEdges(upper, hasInEdges);
for (auto& rec : retRecs) {
nebula::storage::cpp2::NewEdge newEdge;
nebula::storage::cpp2::EdgeKey edgeKey;
Expand Down
6 changes: 4 additions & 2 deletions src/mock/MockData.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class MockData {
static std::vector<std::pair<PartitionID, std::string>> mockPlayerIndexKeys(bool upper = false);

// generate serve edge
static std::vector<EdgeData> mockEdges(bool upper = false);
// param: includeInEdges, if the return set has both out and in edges
static std::vector<EdgeData> mockEdges(bool upper = false, bool includeInEdges = true);

static std::vector<std::pair<PartitionID, std::string>> mockServeIndexKeys();

Expand Down Expand Up @@ -169,7 +170,8 @@ class MockData {
int32_t parts = 6);

static nebula::storage::cpp2::AddEdgesRequest mockAddEdgesReq(bool upper = false,
int32_t parts = 6);
int32_t parts = 6,
bool hasInEdges = true);

static nebula::storage::cpp2::DeleteVerticesRequest mockDeleteVerticesReq(int32_t parts = 6);

Expand Down
13 changes: 9 additions & 4 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,22 @@ nebula_add_library(
storage_transaction_executor OBJECT
transaction/TransactionManager.cpp
transaction/ConsistUtil.cpp
transaction/ChainUpdateEdgeProcessorLocal.cpp
transaction/ChainUpdateEdgeProcessorRemote.cpp
transaction/ChainUpdateEdgeLocalProcessor.cpp
transaction/ChainUpdateEdgeRemoteProcessor.cpp
transaction/ChainResumeProcessor.cpp
transaction/ChainAddEdgesGroupProcessor.cpp
transaction/ChainAddEdgesProcessorLocal.cpp
transaction/ChainAddEdgesProcessorRemote.cpp
transaction/ChainAddEdgesLocalProcessor.cpp
transaction/ChainAddEdgesRemoteProcessor.cpp
transaction/ResumeAddEdgeProcessor.cpp
transaction/ResumeAddEdgeRemoteProcessor.cpp
transaction/ResumeUpdateProcessor.cpp
transaction/ResumeUpdateRemoteProcessor.cpp
transaction/ChainProcessorFactory.cpp
transaction/ChainDeleteEdgesGroupProcessor.cpp
transaction/ChainDeleteEdgesLocalProcessor.cpp
transaction/ChainDeleteEdgesRemoteProcessor.cpp
transaction/ChainDeleteEdgesResumeProcessor.cpp
transaction/ChainDeleteEdgesResumeRemoteProcessor.cpp
)

nebula_add_library(
Expand Down
11 changes: 9 additions & 2 deletions src/storage/GraphStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
#include "storage/query/ScanEdgeProcessor.h"
#include "storage/query/ScanVertexProcessor.h"
#include "storage/transaction/ChainAddEdgesGroupProcessor.h"
#include "storage/transaction/ChainUpdateEdgeProcessorLocal.h"
#include "storage/transaction/ChainDeleteEdgesGroupProcessor.h"
#include "storage/transaction/ChainUpdateEdgeLocalProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand Down Expand Up @@ -112,7 +113,7 @@ folly::Future<cpp2::UpdateResponse> GraphStorageServiceHandler::future_updateEdg

folly::Future<cpp2::UpdateResponse> GraphStorageServiceHandler::future_chainUpdateEdge(
const cpp2::UpdateEdgeRequest& req) {
auto* proc = ChainUpdateEdgeProcessorLocal::instance(env_);
auto* proc = ChainUpdateEdgeLocalProcessor::instance(env_);
RETURN_FUTURE(proc);
}

Expand Down Expand Up @@ -160,6 +161,12 @@ folly::Future<cpp2::ExecResponse> GraphStorageServiceHandler::future_chainAddEdg
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse> GraphStorageServiceHandler::future_chainDeleteEdges(
const cpp2::DeleteEdgesRequest& req) {
auto* processor = ChainDeleteEdgesGroupProcessor::instance(env_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse> GraphStorageServiceHandler::future_put(
const cpp2::KVPutRequest& req) {
auto* processor = PutProcessor::instance(env_);
Expand Down
3 changes: 3 additions & 0 deletions src/storage/GraphStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class GraphStorageServiceHandler final : public cpp2::GraphStorageServiceSvIf {

folly::Future<cpp2::ScanResponse> future_scanVertex(const cpp2::ScanVertexRequest& req) override;

folly::Future<cpp2::ExecResponse> future_chainDeleteEdges(
const cpp2::DeleteEdgesRequest& req) override;

folly::Future<cpp2::ScanResponse> future_scanEdge(const cpp2::ScanEdgeRequest& req) override;

folly::Future<cpp2::GetUUIDResp> future_getUUID(const cpp2::GetUUIDReq& req) override;
Expand Down
15 changes: 11 additions & 4 deletions src/storage/InternalStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

#include "storage/InternalStorageServiceHandler.h"

#include "storage/transaction/ChainAddEdgesProcessorRemote.h"
#include "storage/transaction/ChainUpdateEdgeProcessorRemote.h"
#include "storage/transaction/ChainAddEdgesRemoteProcessor.h"
#include "storage/transaction/ChainDeleteEdgesRemoteProcessor.h"
#include "storage/transaction/ChainUpdateEdgeRemoteProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand All @@ -20,13 +21,19 @@ InternalStorageServiceHandler::InternalStorageServiceHandler(StorageEnv* env) :

folly::Future<cpp2::ExecResponse> InternalStorageServiceHandler::future_chainAddEdges(
const cpp2::ChainAddEdgesRequest& req) {
auto* processor = ChainAddEdgesProcessorRemote::instance(env_);
auto* processor = ChainAddEdgesRemoteProcessor::instance(env_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::UpdateResponse> InternalStorageServiceHandler::future_chainUpdateEdge(
const cpp2::ChainUpdateEdgeRequest& req) {
auto* processor = ChainUpdateEdgeProcessorRemote::instance(env_);
auto* processor = ChainUpdateEdgeRemoteProcessor::instance(env_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse> InternalStorageServiceHandler::future_chainDeleteEdges(
const cpp2::ChainDeleteEdgesRequest& req) {
auto* processor = ChainDeleteEdgesRemoteProcessor::instance(env_);
RETURN_FUTURE(processor);
}

Expand Down
3 changes: 3 additions & 0 deletions src/storage/InternalStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class InternalStorageServiceHandler final : public cpp2::InternalStorageServiceS
folly::Future<cpp2::UpdateResponse> future_chainUpdateEdge(
const cpp2::ChainUpdateEdgeRequest& p_req);

folly::Future<cpp2::ExecResponse> future_chainDeleteEdges(
const cpp2::ChainDeleteEdgesRequest& p_req);

private:
StorageEnv* env_{nullptr};
};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ extern ProcessorCounters kAddEdgesCounters;

class AddEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
friend class TransactionManager;
friend class ChainAddEdgesProcessorLocal;
friend class ChainAddEdgesLocalProcessor;

public:
static AddEdgesProcessor* instance(StorageEnv* env,
Expand Down
Loading

0 comments on commit 8c2e50f

Please sign in to comment.