diff --git a/CMakeLists.txt b/CMakeLists.txt index 8d9dee102f0..2e6729faa47 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,10 @@ option(ENABLE_INCLUDE_WHAT_YOU_USE "Enable include-what-you-use find nouse incl add_definitions(-DNEBULA_HOME=${CMAKE_SOURCE_DIR}) +if(ENABLE_STANDALONE_VERSION) + add_definitions(-DBUILD_STANDALONE) +endif() + list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake) list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake/nebula) diff --git a/cmake/nebula/GeneralCMakeOptions.cmake b/cmake/nebula/GeneralCMakeOptions.cmake index 8fc3fd7a821..55583470f4d 100644 --- a/cmake/nebula/GeneralCMakeOptions.cmake +++ b/cmake/nebula/GeneralCMakeOptions.cmake @@ -18,6 +18,7 @@ option(ENABLE_COMPRESSED_DEBUG_INFO "Compress debug info to reduce binary si option(ENABLE_CLANG_TIDY "Enable clang-tidy if present" OFF) option(ENABLE_GDB_SCRIPT_SECTION "Add .debug_gdb_scripts section" OFF) option(DISABLE_CXX11_ABI "Whether to disable cxx11 abi" OFF) +option(ENABLE_STANDALONE_VERSION "Enable standalone version build" OFF) get_cmake_property(variable_list VARIABLES) foreach(_varname ${variable_list}) diff --git a/conf/CMakeLists.txt b/conf/CMakeLists.txt index d75644e1fbf..0ec9079790c 100644 --- a/conf/CMakeLists.txt +++ b/conf/CMakeLists.txt @@ -5,6 +5,8 @@ # These configuration files are for reference to generate your own customized ones. # Thus, they are installed as read-only, even for the owner. +if(NOT ENABLE_STANDALONE_VERSION) + install( FILES nebula-graphd.conf.default @@ -47,3 +49,20 @@ install( COMPONENT storage ) + +else() + +install( + FILES + nebula-standalone.conf.default + PERMISSIONS + OWNER_READ + GROUP_READ + WORLD_READ + DESTINATION + etc + COMPONENT + graph + ) + +endif() diff --git a/conf/nebula-standalone.conf.default b/conf/nebula-standalone.conf.default new file mode 100644 index 00000000000..b356ef21691 --- /dev/null +++ b/conf/nebula-standalone.conf.default @@ -0,0 +1,164 @@ +########## basics ########## +# Whether to run as a daemon process +--daemonize=true +# The file to host the process id +--pid_file=pids/nebula-standalone.pid +# Whether to enable optimizer +--enable_optimizer=true +# The default charset when a space is created +--default_charset=utf8 +# The defaule collate when a space is created +--default_collate=utf8_bin +# Whether to use the configuration obtained from the configuration file +--local_config=true + +########## logging ########## +# The directory to host logging files +--log_dir=logs +# Log level, 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively +--minloglevel=0 +# Verbose log level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging +--v=0 +# Maximum seconds to buffer the log messages +--logbufsecs=0 +# Whether to redirect stdout and stderr to separate output files +--redirect_stdout=true +# Destination filename of stdout and stderr, which will also reside in log_dir. +--stdout_log_file=standalone-stdout.log +--stderr_log_file=standalone-stderr.log +# Copy log messages at or above this level to stderr in addition to logfiles. The numbers of severity levels INFO, WARNING, ERROR, and FATAL are 0, 1, 2, and 3, respectively. +--stderrthreshold=2 + +########## query ########## +# Whether to treat partial success as an error. +# This flag is only used for Read-only access, and Modify access always treats partial success as an error. +--accept_partial_success=false +# Maximum sentence length, unit byte +--max_allowed_query_size=4194304 + +########## networking ########## +# Comma separated Meta Server Addresses +--meta_server_addrs=127.0.0.1:9559 +# Local IP used to identify the nebula-graphd process. +# Change it to an address other than loopback if the service is distributed or +# will be accessed remotely. +--local_ip=127.0.0.1 +# Network device to listen on +--listen_netdev=any +# Port to listen on +--port=9669 +--meta_port=9559 +--storage_port=9779 +# To turn on SO_REUSEPORT or not +--reuse_port=false +# Backlog of the listen socket, adjust this together with net.core.somaxconn +--listen_backlog=1024 +# The number of seconds Nebula service waits before closing the idle connections +--client_idle_timeout_secs=28800 +# The number of seconds before idle sessions expire +# The range should be in [1, 604800] +--session_idle_timeout_secs=28800 +# The number of threads to accept incoming connections +--num_accept_threads=1 +# The number of networking IO threads, 0 for # of CPU cores +--num_netio_threads=0 +# The number of threads to execute user queries, 0 for # of CPU cores +--num_worker_threads=0 +# HTTP service ip +--ws_ip=0.0.0.0 +# HTTP service port +--ws_http_port=19669 +# HTTP2 service port +--ws_h2_port=19670 +# storage client timeout +--storage_client_timeout_ms=60000 +# Port to listen on Meta with HTTP protocol, it corresponds to ws_http_port in metad's configuration file +--ws_meta_http_port=19559 +# HTTP service port +--ws_storage_http_port=19779 +# HTTP2 service port +--ws_storage_h2_port=19780 +# heartbeat with meta service +--heartbeat_interval_secs=10 + + +########## authentication ########## +# Enable authorization +--enable_authorize=false +# User login authentication type, password for nebula authentication, ldap for ldap authentication, cloud for cloud authentication +--auth_type=password + +########## memory ########## +# System memory high watermark ratio +--system_memory_high_watermark_ratio=0.8 + +########## experimental feature ########## +# if use experimental features +--enable_experimental_feature=false + +######### Raft ######### +# Raft election timeout +--raft_heartbeat_interval_secs=30 +# RPC timeout for raft client (ms) +--raft_rpc_timeout_ms=500 +## recycle Raft WAL +--wal_ttl=14400 + +########## Disk ########## +# Root data path. Split by comma. e.g. --data_path=/disk1/path1/,/disk2/path2/ +# One path per Rocksdb instance. +--data_path=data/storage + +# Minimum reserved bytes of each data path +--minimum_reserved_bytes=268435456 + +# The default reserved bytes for one batch operation +--rocksdb_batch_size=4096 +# The default block cache size used in BlockBasedTable. +# The unit is MB. +--rocksdb_block_cache=4 +# The type of storage engine, `rocksdb', `memory', etc. +--engine_type=rocksdb + +# Compression algorithm, options: no,snappy,lz4,lz4hc,zlib,bzip2,zstd +# For the sake of binary compatibility, the default value is snappy. +# Recommend to use: +# * lz4 to gain more CPU performance, with the same compression ratio with snappy +# * zstd to occupy less disk space +# * lz4hc for the read-heavy write-light scenario +--rocksdb_compression=lz4 + +# Set different compressions for different levels +# For example, if --rocksdb_compression is snappy, +# "no:no:lz4:lz4::zstd" is identical to "no:no:lz4:lz4:snappy:zstd:snappy" +# In order to disable compression for level 0/1, set it to "no:no" +--rocksdb_compression_per_level= + +# Whether or not to enable rocksdb's statistics, disabled by default +--enable_rocksdb_statistics=false + +# Statslevel used by rocksdb to collection statistics, optional values are +# * kExceptHistogramOrTimers, disable timer stats, and skip histogram stats +# * kExceptTimers, Skip timer stats +# * kExceptDetailedTimers, Collect all stats except time inside mutex lock AND time spent on compression. +# * kExceptTimeForMutex, Collect all stats except the counters requiring to get time inside the mutex lock. +# * kAll, Collect all stats +--rocksdb_stats_level=kExceptHistogramOrTimers + +# Whether or not to enable rocksdb's prefix bloom filter, enabled by default. +--enable_rocksdb_prefix_filtering=true +# Whether or not to enable rocksdb's whole key bloom filter, disabled by default. +--enable_rocksdb_whole_key_filtering=false + +############## rocksdb Options ############## +# rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma +--rocksdb_db_options={} +# rocksdb ColumnFamilyOptions in json, each name and value of option is string, given as "option_name":"option_value" separated by comma +--rocksdb_column_family_options={"write_buffer_size":"67108864","max_write_buffer_number":"4","max_bytes_for_level_base":"268435456"} +# rocksdb BlockBasedTableOptions in json, each name and value of option is string, given as "option_name":"option_value" separated by comma +--rocksdb_block_based_table_options={"block_size":"8192"} + +############## meta Options ############## +--meta_data_path=data/meta +--default_replica_factor=1 +--default_parts_num=100 diff --git a/scripts/CMakeLists.txt b/scripts/CMakeLists.txt index 5d375d2bd14..02cbf67a2c8 100644 --- a/scripts/CMakeLists.txt +++ b/scripts/CMakeLists.txt @@ -2,6 +2,8 @@ # # This source code is licensed under Apache 2.0 License. +if(NOT ENABLE_STANDALONE_VERSION) + install( FILES nebula-storaged.service @@ -30,7 +32,7 @@ install( install( FILES - nebula.service + nebula-graphd.service PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ @@ -38,12 +40,29 @@ install( DESTINATION scripts COMPONENT - common + graph ) +else() + install( FILES - utils.sh + nebula-standalone.service + PERMISSIONS + OWNER_EXECUTE OWNER_WRITE OWNER_READ + GROUP_EXECUTE GROUP_READ + WORLD_EXECUTE WORLD_READ + DESTINATION + scripts + COMPONENT + graph +) + +endif() + +install( + FILES + nebula.service PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ @@ -56,7 +75,7 @@ install( install( FILES - nebula-graphd.service + utils.sh PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ @@ -64,7 +83,7 @@ install( DESTINATION scripts COMPONENT - graph + common ) install( diff --git a/scripts/nebula-standalone.service b/scripts/nebula-standalone.service new file mode 100644 index 00000000000..3405081cb0b --- /dev/null +++ b/scripts/nebula-standalone.service @@ -0,0 +1,6 @@ +#! /bin/bash + +SCRIPT_PATH=$(readlink -f $0) +SCRIPT_DIR=$(dirname ${SCRIPT_PATH}) +export USAGE_INFO="${0} [-v] [-c /path/to/conf] " +${SCRIPT_DIR}/nebula.service $@ standalone diff --git a/scripts/nebula.service b/scripts/nebula.service index 04f38c074f6..7fea4a2f977 100755 --- a/scripts/nebula.service +++ b/scripts/nebula.service @@ -77,6 +77,13 @@ fi ACTION=${1} TARGET=${2} +if [ -e ${INSTALL_ROOT}/bin/nebula-standalone ] +then + ALLTARGET=(standalone) +else + ALLTARGET=(metad graphd storaged) +fi + # Collect the daemons on which we perform the action on case ${TARGET} in metad) @@ -88,8 +95,11 @@ case ${TARGET} in storaged) TARGETS=(${TARGET}) ;; + standalone) + TARGETS=(${TARGET}) + ;; all) - TARGETS=(metad graphd storaged) + TARGETS=${ALLTARGET[*]} ;; *) ERROR "Unknown daemon \`${DAEMON}'" diff --git a/src/clients/storage/GeneralStorageClient.h b/src/clients/storage/GeneralStorageClient.h new file mode 100644 index 00000000000..805c37d7a90 --- /dev/null +++ b/src/clients/storage/GeneralStorageClient.h @@ -0,0 +1,56 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef CLIENTS_STORAGE_GENERALSTORAGECLIENT_H_ +#define CLIENTS_STORAGE_GENERALSTORAGECLIENT_H_ + +#include + +#include "clients/meta/MetaClient.h" +#include "clients/storage/StorageClientBase.h" +#include "common/base/Base.h" +#include "common/datatypes/KeyValue.h" +#include "common/thrift/ThriftClientManager.h" +#include "interface/gen-cpp2/GeneralStorageServiceAsyncClient.h" + +namespace nebula { +namespace storage { + +/** + * A wrapper class for GeneralStorageServiceAsyncClient thrift API + * + * The class is NOT reentrant + */ +class GeneralStorageClient + : public StorageClientBase< + cpp2::GeneralStorageServiceAsyncClient, + thrift::ThriftClientManager> { + using Parent = + StorageClientBase>; + + public: + GeneralStorageClient(std::shared_ptr ioThreadPool, + meta::MetaClient* metaClient) + : Parent(ioThreadPool, metaClient) {} + virtual ~GeneralStorageClient() {} + + folly::SemiFuture> get(GraphSpaceID space, + std::vector&& keys, + bool returnPartly = false, + folly::EventBase* evb = nullptr); + + folly::SemiFuture> put(GraphSpaceID space, + std::vector kvs, + folly::EventBase* evb = nullptr); + + folly::SemiFuture> remove(GraphSpaceID space, + std::vector keys, + folly::EventBase* evb = nullptr); +}; + +} // namespace storage +} // namespace nebula +#endif // CLIENTS_STORAGE_GENERALSTORAGECLIENT_H_ diff --git a/src/clients/storage/InternalStorageClient.h b/src/clients/storage/InternalStorageClient.h index fe16b78c0a0..f34311f605a 100644 --- a/src/clients/storage/InternalStorageClient.h +++ b/src/clients/storage/InternalStorageClient.h @@ -11,6 +11,7 @@ #include "clients/storage/StorageClientBase.h" #include "common/base/Base.h" #include "common/base/ErrorOr.h" +#include "common/thrift/ThriftClientManager.h" #include "interface/gen-cpp2/InternalStorageServiceAsyncClient.h" namespace nebula { @@ -21,8 +22,13 @@ namespace storage { * * The class is NOT reentrant */ -class InternalStorageClient : public StorageClientBase { - using Parent = StorageClientBase; +class InternalStorageClient + : public StorageClientBase< + cpp2::InternalStorageServiceAsyncClient, + thrift::ThriftClientManager> { + using Parent = + StorageClientBase>; public: InternalStorageClient(std::shared_ptr ioThreadPool, diff --git a/src/clients/storage/StorageClient.cpp b/src/clients/storage/StorageClient.cpp index 41cb0928715..61381598225 100644 --- a/src/clients/storage/StorageClient.cpp +++ b/src/clients/storage/StorageClient.cpp @@ -100,12 +100,11 @@ StorageRpcRespFuture StorageClient::getNeighbors( req.traverse_spec_ref() = std::move(spec); } - return collectResponse( - param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::GetNeighborsRequest& r) { - return client->future_getNeighbors(r); - }); + return collectResponse(param.evb, + std::move(requests), + [](ThriftClientType* client, const cpp2::GetNeighborsRequest& r) { + return client->future_getNeighbors(r); + }); } StorageRpcRespFuture StorageClient::addVertices( @@ -140,12 +139,11 @@ StorageRpcRespFuture StorageClient::addVertices( req.common_ref() = common; } - return collectResponse( - param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::AddVerticesRequest& r) { - return client->future_addVertices(r); - }); + return collectResponse(param.evb, + std::move(requests), + [](ThriftClientType* client, const cpp2::AddVerticesRequest& r) { + return client->future_addVertices(r); + }); } StorageRpcRespFuture StorageClient::addEdges(const CommonRequestParam& param, @@ -178,13 +176,13 @@ StorageRpcRespFuture StorageClient::addEdges(const CommonReq req.prop_names_ref() = propNames; req.common_ref() = common; } - return collectResponse( - param.evb, - std::move(requests), - [useToss = param.useExperimentalFeature](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::AddEdgesRequest& r) { - return useToss ? client->future_chainAddEdges(r) : client->future_addEdges(r); - }); + return collectResponse(param.evb, + std::move(requests), + [useToss = param.useExperimentalFeature](ThriftClientType* client, + const cpp2::AddEdgesRequest& r) { + return useToss ? client->future_chainAddEdges(r) + : client->future_addEdges(r); + }); } StorageRpcRespFuture StorageClient::getProps( @@ -237,10 +235,10 @@ StorageRpcRespFuture StorageClient::getProps( req.common_ref() = common; } - return collectResponse(param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::GetPropRequest& r) { return client->future_getProps(r); }); + return collectResponse( + param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) { + return client->future_getProps(r); + }); } StorageRpcRespFuture StorageClient::deleteEdges( @@ -268,12 +266,11 @@ StorageRpcRespFuture StorageClient::deleteEdges( req.common_ref() = common; } - return collectResponse( - param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::DeleteEdgesRequest& r) { - return client->future_deleteEdges(r); - }); + return collectResponse(param.evb, + std::move(requests), + [](ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) { + return client->future_deleteEdges(r); + }); } StorageRpcRespFuture StorageClient::deleteVertices( @@ -301,12 +298,11 @@ StorageRpcRespFuture StorageClient::deleteVertices( req.common_ref() = common; } - return collectResponse( - param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::DeleteVerticesRequest& r) { - return client->future_deleteVertices(r); - }); + return collectResponse(param.evb, + std::move(requests), + [](ThriftClientType* client, const cpp2::DeleteVerticesRequest& r) { + return client->future_deleteVertices(r); + }); } StorageRpcRespFuture StorageClient::deleteTags( @@ -334,12 +330,11 @@ StorageRpcRespFuture StorageClient::deleteTags( req.common_ref() = common; } - return collectResponse( - param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::DeleteTagsRequest& r) { - return client->future_deleteTags(r); - }); + return collectResponse(param.evb, + std::move(requests), + [](ThriftClientType* client, const cpp2::DeleteTagsRequest& r) { + return client->future_deleteTags(r); + }); } folly::Future> StorageClient::updateVertex( @@ -388,12 +383,11 @@ folly::Future> StorageClient::updateVert } request.second = std::move(req); - return getResponse( - param.evb, - std::move(request), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::UpdateVertexRequest& r) { - return client->future_updateVertex(r); - }); + return getResponse(param.evb, + std::move(request), + [](ThriftClientType* client, const cpp2::UpdateVertexRequest& r) { + return client->future_updateVertex(r); + }); } folly::Future> StorageClient::updateEdge( @@ -441,14 +435,13 @@ folly::Future> StorageClient::updateEdge } request.second = std::move(req); - return getResponse( - param.evb, - std::move(request), - [useExperimentalFeature = param.useExperimentalFeature]( - cpp2::GraphStorageServiceAsyncClient* client, const cpp2::UpdateEdgeRequest& r) { - return useExperimentalFeature ? client->future_chainUpdateEdge(r) - : client->future_updateEdge(r); - }); + return getResponse(param.evb, + std::move(request), + [useExperimentalFeature = param.useExperimentalFeature]( + ThriftClientType* client, const cpp2::UpdateEdgeRequest& r) { + return useExperimentalFeature ? client->future_chainUpdateEdge(r) + : client->future_updateEdge(r); + }); } folly::Future> StorageClient::getUUID(GraphSpaceID space, @@ -478,11 +471,10 @@ folly::Future> StorageClient::getUUID(GraphSpaceID s req.name_ref() = name; request.second = std::move(req); - return getResponse(evb, - std::move(request), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::GetUUIDReq& r) { - return client->future_getUUID(r); - }); + return getResponse( + evb, std::move(request), [](ThriftClientType* client, const cpp2::GetUUIDReq& r) { + return client->future_getUUID(r); + }); } StorageRpcRespFuture StorageClient::lookupIndex( @@ -524,12 +516,11 @@ StorageRpcRespFuture StorageClient::lookupIndex( req.limit_ref() = limit; } - return collectResponse( - param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::LookupIndexRequest& r) { - return client->future_lookupIndex(r); - }); + return collectResponse(param.evb, + std::move(requests), + [](ThriftClientType* client, const cpp2::LookupIndexRequest& r) { + return client->future_lookupIndex(r); + }); } StorageRpcRespFuture StorageClient::lookupAndTraverse( @@ -554,12 +545,11 @@ StorageRpcRespFuture StorageClient::lookupAndTravers req.common_ref() = common; } - return collectResponse( - param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::LookupAndTraverseRequest& r) { - return client->future_lookupAndTraverse(r); - }); + return collectResponse(param.evb, + std::move(requests), + [](ThriftClientType* client, const cpp2::LookupAndTraverseRequest& r) { + return client->future_lookupAndTraverse(r); + }); } StorageRpcRespFuture StorageClient::scanEdge( @@ -587,10 +577,10 @@ StorageRpcRespFuture StorageClient::scanEdge( req.common_ref() = param.toReqCommon(); } - return collectResponse(param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); }); + return collectResponse( + param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::ScanEdgeRequest& r) { + return client->future_scanEdge(r); + }); } StorageRpcRespFuture StorageClient::scanVertex( @@ -618,12 +608,11 @@ StorageRpcRespFuture StorageClient::scanVertex( req.common_ref() = param.toReqCommon(); } - return collectResponse( - param.evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::ScanVertexRequest& r) { - return client->future_scanVertex(r); - }); + return collectResponse(param.evb, + std::move(requests), + [](ThriftClientType* client, const cpp2::ScanVertexRequest& r) { + return client->future_scanVertex(r); + }); } folly::SemiFuture> StorageClient::get( @@ -646,10 +635,10 @@ folly::SemiFuture> StorageClient::get( req.return_partly_ref() = returnPartly; } - return collectResponse(evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::KVGetRequest& r) { return client->future_get(r); }); + return collectResponse( + evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVGetRequest& r) { + return client->future_get(r); + }); } folly::SemiFuture> StorageClient::put( @@ -671,10 +660,10 @@ folly::SemiFuture> StorageClient::put( req.parts_ref() = std::move(c.second); } - return collectResponse(evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::KVPutRequest& r) { return client->future_put(r); }); + return collectResponse( + evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVPutRequest& r) { + return client->future_put(r); + }); } folly::SemiFuture> StorageClient::remove( @@ -696,10 +685,10 @@ folly::SemiFuture> StorageClient::remove( req.parts_ref() = std::move(c.second); } - return collectResponse(evb, - std::move(requests), - [](cpp2::GraphStorageServiceAsyncClient* client, - const cpp2::KVRemoveRequest& r) { return client->future_remove(r); }); + return collectResponse( + evb, std::move(requests), [](ThriftClientType* client, const cpp2::KVRemoveRequest& r) { + return client->future_remove(r); + }); } StatusOr> StorageClient::getIdFromRow( diff --git a/src/clients/storage/StorageClient.h b/src/clients/storage/StorageClient.h index ae2fd3d4079..40c0dcb3aca 100644 --- a/src/clients/storage/StorageClient.h +++ b/src/clients/storage/StorageClient.h @@ -9,7 +9,10 @@ #include "clients/storage/StorageClientBase.h" #include "common/base/Base.h" +#include "common/thrift/ThriftClientManager.h" +#include "common/thrift/ThriftLocalClientManager.h" #include "interface/gen-cpp2/GraphStorageServiceAsyncClient.h" +#include "storage/GraphStorageLocalServer.h" namespace nebula { namespace storage { @@ -22,7 +25,18 @@ using StorageRpcRespFuture = folly::SemiFuture>; * * The class is NOT reentrant */ -class StorageClient : public StorageClientBase { +#ifndef BUILD_STANDALONE +using ThriftClientType = cpp2::GraphStorageServiceAsyncClient; +template +using ThriftClientManType = thrift::ThriftClientManager; +#else +using ThriftClientType = GraphStorageLocalServer; +template +using ThriftClientManType = thrift::LocalClientManager; + +#endif +class StorageClient + : public StorageClientBase> { FRIEND_TEST(StorageClientTest, LeaderChangeTest); public: @@ -46,7 +60,8 @@ class StorageClient : public StorageClientBase ioThreadPool, meta::MetaClient* metaClient) - : StorageClientBase(ioThreadPool, metaClient) {} + : StorageClientBase>(ioThreadPool, + metaClient) {} virtual ~StorageClient() {} StorageRpcRespFuture getNeighbors( diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index abeb3ed130c..6341e6dd74e 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -70,50 +70,52 @@ struct ResponseContext { bool fulfilled_{false}; }; -template -StorageClientBase::StorageClientBase( +template +StorageClientBase::StorageClientBase( std::shared_ptr threadPool, meta::MetaClient* metaClient) : metaClient_(metaClient), ioThreadPool_(threadPool) { - clientsMan_ = std::make_unique>(FLAGS_enable_ssl); + clientsMan_ = std::make_unique(FLAGS_enable_ssl); } -template -StorageClientBase::~StorageClientBase() { +template +StorageClientBase::~StorageClientBase() { VLOG(3) << "Destructing StorageClientBase"; if (nullptr != metaClient_) { metaClient_ = nullptr; } } -template -StatusOr StorageClientBase::getLeader(GraphSpaceID spaceId, - PartitionID partId) const { +template +StatusOr StorageClientBase::getLeader( + GraphSpaceID spaceId, PartitionID partId) const { return metaClient_->getStorageLeaderFromCache(spaceId, partId); } -template -void StorageClientBase::updateLeader(GraphSpaceID spaceId, - PartitionID partId, - const HostAddr& leader) { +template +void StorageClientBase::updateLeader(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& leader) { metaClient_->updateStorageLeader(spaceId, partId, leader); } -template -void StorageClientBase::invalidLeader(GraphSpaceID spaceId, PartitionID partId) { +template +void StorageClientBase::invalidLeader(GraphSpaceID spaceId, + PartitionID partId) { metaClient_->invalidStorageLeader(spaceId, partId); } -template -void StorageClientBase::invalidLeader(GraphSpaceID spaceId, - std::vector& partsId) { +template +void StorageClientBase::invalidLeader( + GraphSpaceID spaceId, std::vector& partsId) { for (const auto& partId : partsId) { invalidLeader(spaceId, partId); } } -template +template template -folly::SemiFuture> StorageClientBase::collectResponse( +folly::SemiFuture> +StorageClientBase::collectResponse( folly::EventBase* evb, std::unordered_map requests, RemoteFunc&& remoteFunc) { @@ -214,9 +216,9 @@ folly::SemiFuture> StorageClientBase::c return context->promise.getSemiFuture(); } -template +template template -folly::Future> StorageClientBase::getResponse( +folly::Future> StorageClientBase::getResponse( folly::EventBase* evb, std::pair&& request, RemoteFunc&& remoteFunc) { auto pro = std::make_shared>>(); auto f = pro->getFuture(); @@ -225,9 +227,9 @@ folly::Future> StorageClientBase::getResponse( return f; } -template +template template -void StorageClientBase::getResponseImpl( +void StorageClientBase::getResponseImpl( folly::EventBase* evb, std::pair request, RemoteFunc remoteFunc, @@ -291,14 +293,14 @@ void StorageClientBase::getResponseImpl( }); // via } -template +template template StatusOr>>> -StorageClientBase::clusterIdsToHosts(GraphSpaceID spaceId, - const Container& ids, - GetIdFunc f) const { +StorageClientBase::clusterIdsToHosts(GraphSpaceID spaceId, + const Container& ids, + GetIdFunc f) const { std::unordered_map>> clusters; @@ -330,9 +332,9 @@ StorageClientBase::clusterIdsToHosts(GraphSpaceID spaceId, return clusters; } -template +template StatusOr>> -StorageClientBase::getHostParts(GraphSpaceID spaceId) const { +StorageClientBase::getHostParts(GraphSpaceID spaceId) const { std::unordered_map> hostParts; auto status = metaClient_->partsNum(spaceId); if (!status.ok()) { @@ -350,9 +352,10 @@ StorageClientBase::getHostParts(GraphSpaceID spaceId) const { return hostParts; } -template +template StatusOr>> -StorageClientBase::getHostPartsWithCursor(GraphSpaceID spaceId) const { +StorageClientBase::getHostPartsWithCursor( + GraphSpaceID spaceId) const { std::unordered_map> hostParts; auto status = metaClient_->partsNum(spaceId); if (!status.ok()) { diff --git a/src/clients/storage/StorageClientBase.h b/src/clients/storage/StorageClientBase.h index 3ad810d4802..c86ee653207 100644 --- a/src/clients/storage/StorageClientBase.h +++ b/src/clients/storage/StorageClientBase.h @@ -116,7 +116,7 @@ class StorageRpcResponse final { /** * A base class for all storage clients */ -template +template class StorageClientBase { public: StatusOr getLeader(GraphSpaceID spaceId, PartitionID partId) const; @@ -220,7 +220,7 @@ class StorageClientBase { private: std::shared_ptr ioThreadPool_; - std::unique_ptr> clientsMan_; + std::unique_ptr clientsMan_; }; } // namespace storage diff --git a/src/common/thrift/ThriftClientManager-inl.h b/src/common/thrift/ThriftClientManager-inl.h index f823d43a48e..4b6dfd0ef6f 100644 --- a/src/common/thrift/ThriftClientManager-inl.h +++ b/src/common/thrift/ThriftClientManager-inl.h @@ -10,6 +10,7 @@ #include #include +#include "common/base/Base.h" #include "common/network/NetworkUtils.h" #include "common/ssl/SSLConfig.h" @@ -17,7 +18,6 @@ DECLARE_int32(conn_timeout_ms); namespace nebula { namespace thrift { - template std::shared_ptr ThriftClientManager::client(const HostAddr& host, folly::EventBase* evb, diff --git a/src/common/thrift/ThriftLocalClientManager.h b/src/common/thrift/ThriftLocalClientManager.h new file mode 100644 index 00000000000..c7229937056 --- /dev/null +++ b/src/common/thrift/ThriftLocalClientManager.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#pragma once + +#include +#include + +#include "common/base/Base.h" +#include "common/datatypes/HostAddr.h" +namespace nebula { +namespace thrift { + +template +class LocalClientManager final { + public: + std::shared_ptr client(const HostAddr& host, + folly::EventBase* evb = nullptr, + bool compatibility = false, + uint32_t timeout = 0) { + UNUSED(host); + UNUSED(evb); + UNUSED(compatibility); + UNUSED(timeout); + return ClientType::getInstance(); + } + + ~LocalClientManager() { + VLOG(3) << "~LocalClientManager"; + } + + explicit LocalClientManager(bool enableSSL = false) { + UNUSED(enableSSL); + VLOG(3) << "LocalClientManager"; + } +}; +} // namespace thrift +} // namespace nebula diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index cc0530863f7..e0b3ec19011 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -27,7 +27,6 @@ set(common_deps $ $ $ - $ $ $ $ @@ -57,6 +56,8 @@ set(storage_meta_deps $ ) +if(NOT ENABLE_STANDALONE_VERSION) + nebula_add_executable( NAME nebula-storaged @@ -89,6 +90,7 @@ nebula_add_executable( nebula-metad SOURCES MetaDaemon.cpp + MetaDaemonInit.cpp SetupLogging.cpp SetupBreakpad.cpp OBJECTS @@ -186,3 +188,77 @@ install( COMPONENT meta ) + +else() + +nebula_add_executable( + NAME + nebula-standalone + SOURCES + StandAloneDaemon.cpp + MetaDaemonInit.cpp + SetupLogging.cpp + SetupBreakpad.cpp + OBJECTS + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + ${storage_meta_deps} + ${common_deps} + LIBRARIES + ${ROCKSDB_LIBRARIES} + ${PROXYGEN_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle +) + +install( + TARGETS + nebula-standalone + PERMISSIONS + OWNER_EXECUTE OWNER_WRITE OWNER_READ + GROUP_EXECUTE GROUP_READ + WORLD_EXECUTE WORLD_READ + DESTINATION + bin + COMPONENT + graph ## tmp use graph component +) + +endif() diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 06dd0a9c9b8..11960bc7794 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -6,6 +6,7 @@ #include #include +#include "MetaDaemonInit.h" #include "common/base/Base.h" #include "common/base/SignalHandler.h" #include "common/fs/FileUtils.h" @@ -38,22 +39,16 @@ using nebula::ProcessUtils; using nebula::Status; using nebula::StatusOr; using nebula::network::NetworkUtils; -using nebula::web::PathParams; DEFINE_string(local_ip, "", "Local ip specified for NetworkUtils::getLocalIP"); DEFINE_int32(port, 45500, "Meta daemon listening port"); DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option"); -DEFINE_string(data_path, "", "Root data path"); -DEFINE_string(meta_server_addrs, - "", - "It is a list of IPs split by comma, used in cluster deployment" - "the ips number is equal to the replica number." - "If empty, it means it's a single node"); +DECLARE_string(data_path); +DECLARE_string(meta_server_addrs); + // DEFINE_string(local_ip, "", "Local ip specified for // NetworkUtils::getLocalIP"); -DEFINE_int32(num_io_threads, 16, "Number of IO threads"); DEFINE_int32(meta_http_thread_num, 3, "Number of meta daemon's http thread"); -DEFINE_int32(num_worker_threads, 32, "Number of workers"); DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id"); DEFINE_bool(daemonize, true, "Whether run as a daemon process"); @@ -68,133 +63,6 @@ extern Status setupLogging(); extern Status setupBreakpad(); #endif -namespace nebula { -namespace meta { -const std::string kClusterIdKey = "__meta_cluster_id_key__"; // NOLINT -} // namespace meta -} // namespace nebula - -nebula::ClusterID gClusterId = 0; - -std::unique_ptr initKV(std::vector peers, - nebula::HostAddr localhost) { - auto partMan = std::make_unique(); - // The meta server has only one space (0), one part (0) - partMan->addPart(nebula::kDefaultSpaceId, nebula::kDefaultPartId, std::move(peers)); - // folly IOThreadPoolExecutor - auto ioPool = std::make_shared(FLAGS_num_io_threads); - std::shared_ptr threadManager( - apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( - FLAGS_num_worker_threads)); - threadManager->setNamePrefix("executor"); - threadManager->start(); - nebula::kvstore::KVOptions options; - - auto absolute = boost::filesystem::absolute(FLAGS_data_path); - options.dataPaths_ = {absolute.string()}; - - options.partMan_ = std::move(partMan); - auto kvstore = std::make_unique( - std::move(options), ioPool, localhost, threadManager); - if (!(kvstore->init())) { - LOG(ERROR) << "Nebula store init failed"; - return nullptr; - } - - LOG(INFO) << "Waiting for the leader elected..."; - nebula::HostAddr leader; - while (true) { - auto ret = kvstore->partLeader(nebula::kDefaultSpaceId, nebula::kDefaultPartId); - if (!nebula::ok(ret)) { - LOG(ERROR) << "Nebula store init failed"; - return nullptr; - } - leader = nebula::value(ret); - if (leader != nebula::HostAddr("", 0)) { - break; - } - LOG(INFO) << "Leader has not been elected, sleep 1s"; - sleep(1); - } - - gClusterId = - nebula::meta::ClusterIdMan::getClusterIdFromKV(kvstore.get(), nebula::meta::kClusterIdKey); - if (gClusterId == 0) { - if (leader == localhost) { - LOG(INFO) << "I am leader, create cluster Id"; - gClusterId = nebula::meta::ClusterIdMan::create(FLAGS_meta_server_addrs); - if (!nebula::meta::ClusterIdMan::persistInKV( - kvstore.get(), nebula::meta::kClusterIdKey, gClusterId)) { - LOG(ERROR) << "Persist cluster failed!"; - return nullptr; - } - } else { - LOG(INFO) << "I am follower, wait for the leader's clusterId"; - while (gClusterId == 0) { - LOG(INFO) << "Waiting for the leader's clusterId"; - sleep(1); - gClusterId = nebula::meta::ClusterIdMan::getClusterIdFromKV(kvstore.get(), - nebula::meta::kClusterIdKey); - } - } - } - - auto version = nebula::meta::MetaVersionMan::getMetaVersionFromKV(kvstore.get()); - LOG(INFO) << "Get meta version is " << static_cast(version); - if (version == nebula::meta::MetaVersion::UNKNOWN) { - LOG(ERROR) << "Meta version is invalid"; - return nullptr; - } else if (version == nebula::meta::MetaVersion::V1) { - if (leader == localhost) { - LOG(INFO) << "I am leader, begin upgrade meta data"; - // need to upgrade the v1.0 meta data format to v2.0 meta data format - auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get()); - if (!ret.ok()) { - LOG(ERROR) << ret; - return nullptr; - } - } else { - LOG(INFO) << "I am follower, wait for leader to sync upgrade"; - while (version != nebula::meta::MetaVersion::V2) { - VLOG(1) << "Waiting for leader to upgrade"; - sleep(1); - version = nebula::meta::MetaVersionMan::getMetaVersionFromKV(kvstore.get()); - } - } - } - - if (leader == localhost) { - nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get()); - } - - LOG(INFO) << "Nebula store init succeeded, clusterId " << gClusterId; - return kvstore; -} - -Status initWebService(nebula::WebService* svc, - nebula::kvstore::KVStore* kvstore, - nebula::hdfs::HdfsCommandHelper* helper, - nebula::thread::GenericThreadPool* pool) { - LOG(INFO) << "Starting Meta HTTP Service"; - auto& router = svc->router(); - router.get("/download-dispatch").handler([kvstore, helper, pool](PathParams&&) { - auto handler = new nebula::meta::MetaHttpDownloadHandler(); - handler->init(kvstore, helper, pool); - return handler; - }); - router.get("/ingest-dispatch").handler([kvstore, pool](PathParams&&) { - auto handler = new nebula::meta::MetaHttpIngestHandler(); - handler->init(kvstore, pool); - return handler; - }); - router.get("/replace").handler([kvstore](PathParams&&) { - auto handler = new nebula::meta::MetaHttpReplaceHostHandler(); - handler->init(kvstore); - return handler; - }); - return svc->start(); -} - int main(int argc, char* argv[]) { google::SetVersionString(nebula::versionString()); // Detect if the server has already been started @@ -343,7 +211,8 @@ int main(int argc, char* argv[]) { return EXIT_FAILURE; } - auto handler = std::make_shared(gKVStore.get(), gClusterId); + auto handler = + std::make_shared(gKVStore.get(), metaClusterId()); LOG(INFO) << "The meta daemon start on " << localhost; try { gServer = std::make_unique(); diff --git a/src/daemons/MetaDaemonInit.cpp b/src/daemons/MetaDaemonInit.cpp new file mode 100644 index 00000000000..c41b1e02c06 --- /dev/null +++ b/src/daemons/MetaDaemonInit.cpp @@ -0,0 +1,193 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "MetaDaemonInit.h" + +#include +#include + +#include "common/base/Base.h" +#include "common/base/SignalHandler.h" +#include "common/fs/FileUtils.h" +#include "common/hdfs/HdfsCommandHelper.h" +#include "common/hdfs/HdfsHelper.h" +#include "common/network/NetworkUtils.h" +#include "common/ssl/SSLConfig.h" +#include "common/thread/GenericThreadPool.h" +#include "common/utils/MetaKeyUtils.h" +#include "kvstore/NebulaStore.h" +#include "kvstore/PartManager.h" +#include "meta/ActiveHostsMan.h" +#include "meta/KVBasedClusterIdMan.h" +#include "meta/MetaServiceHandler.h" +#include "meta/MetaVersionMan.h" +#include "meta/http/MetaHttpDownloadHandler.h" +#include "meta/http/MetaHttpIngestHandler.h" +#include "meta/http/MetaHttpReplaceHostHandler.h" +#include "meta/processors/job/JobManager.h" +#include "meta/stats/MetaStats.h" +#include "webservice/Router.h" +#include "webservice/WebService.h" + +#ifndef BUILD_STANDALONE +DEFINE_int32(num_io_threads, 16, "Number of IO threads"); +DEFINE_int32(num_worker_threads, 32, "Number of workers"); +DEFINE_string(data_path, "", "Root data path"); +DEFINE_string(meta_server_addrs, + "", + "It is a list of IPs split by comma, used in cluster deployment" + "the ips number is equal to the replica number." + "If empty, it means it's a single node"); +#else +DEFINE_int32(meta_num_io_threads, 16, "Number of IO threads"); +DEFINE_int32(meta_num_worker_threads, 32, "Number of workers"); +DEFINE_string(meta_data_path, "", "Root data path"); +DECLARE_string(meta_server_addrs); // use define from grap flags. +DECLARE_int32(ws_meta_http_port); +DECLARE_int32(ws_meta_h2_port); +#endif + +using nebula::web::PathParams; + +namespace nebula::meta { +const std::string kClusterIdKey = "__meta_cluster_id_key__"; // NOLINT +} // namespace nebula::meta + +nebula::ClusterID gClusterId = 0; +nebula::ClusterID& metaClusterId() { + return gClusterId; +} + +std::unique_ptr initKV(std::vector peers, + nebula::HostAddr localhost) { + auto partMan = std::make_unique(); + // The meta server has only one space (0), one part (0) + partMan->addPart(nebula::kDefaultSpaceId, nebula::kDefaultPartId, std::move(peers)); +#ifndef BUILD_STANDALONE + int32_t numMetaIoThreads = FLAGS_num_io_threads; + int32_t numMetaWorkerThreads = FLAGS_num_worker_threads; +#else + int32_t numMetaIoThreads = FLAGS_meta_num_io_threads; + int32_t numMetaWorkerThreads = FLAGS_meta_num_worker_threads; +#endif + // folly IOThreadPoolExecutor + auto ioPool = std::make_shared(numMetaIoThreads); + std::shared_ptr threadManager( + apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( + numMetaWorkerThreads)); + threadManager->setNamePrefix("executor"); + threadManager->start(); + nebula::kvstore::KVOptions options; +#ifndef BUILD_STANDALONE + auto absolute = boost::filesystem::absolute(FLAGS_data_path); +#else + auto absolute = boost::filesystem::absolute(FLAGS_meta_data_path); +#endif + options.dataPaths_ = {absolute.string()}; + options.partMan_ = std::move(partMan); + auto kvstore = std::make_unique( + std::move(options), ioPool, localhost, threadManager); + if (!(kvstore->init())) { + LOG(ERROR) << "Nebula store init failed"; + return nullptr; + } + + LOG(INFO) << "Waiting for the leader elected..."; + nebula::HostAddr leader; + while (true) { + auto ret = kvstore->partLeader(nebula::kDefaultSpaceId, nebula::kDefaultPartId); + if (!nebula::ok(ret)) { + LOG(ERROR) << "Nebula store init failed"; + return nullptr; + } + leader = nebula::value(ret); + if (leader != nebula::HostAddr("", 0)) { + break; + } + LOG(INFO) << "Leader has not been elected, sleep 1s"; + sleep(1); + } + + gClusterId = + nebula::meta::ClusterIdMan::getClusterIdFromKV(kvstore.get(), nebula::meta::kClusterIdKey); + if (gClusterId == 0) { + if (leader == localhost) { + LOG(INFO) << "I am leader, create cluster Id"; + gClusterId = nebula::meta::ClusterIdMan::create(FLAGS_meta_server_addrs); + if (!nebula::meta::ClusterIdMan::persistInKV( + kvstore.get(), nebula::meta::kClusterIdKey, gClusterId)) { + LOG(ERROR) << "Persist cluster failed!"; + return nullptr; + } + } else { + LOG(INFO) << "I am follower, wait for the leader's clusterId"; + while (gClusterId == 0) { + LOG(INFO) << "Waiting for the leader's clusterId"; + sleep(1); + gClusterId = nebula::meta::ClusterIdMan::getClusterIdFromKV(kvstore.get(), + nebula::meta::kClusterIdKey); + } + } + } + + auto version = nebula::meta::MetaVersionMan::getMetaVersionFromKV(kvstore.get()); + LOG(INFO) << "Get meta version is " << static_cast(version); + if (version == nebula::meta::MetaVersion::UNKNOWN) { + LOG(ERROR) << "Meta version is invalid"; + return nullptr; + } else if (version == nebula::meta::MetaVersion::V1) { + if (leader == localhost) { + LOG(INFO) << "I am leader, begin upgrade meta data"; + // need to upgrade the v1.0 meta data format to v2.0 meta data format + auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get()); + if (!ret.ok()) { + LOG(ERROR) << ret; + return nullptr; + } + } else { + LOG(INFO) << "I am follower, wait for leader to sync upgrade"; + while (version != nebula::meta::MetaVersion::V2) { + VLOG(1) << "Waiting for leader to upgrade"; + sleep(1); + version = nebula::meta::MetaVersionMan::getMetaVersionFromKV(kvstore.get()); + } + } + } + + if (leader == localhost) { + nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get()); + } + + LOG(INFO) << "Nebula store init succeeded, clusterId " << gClusterId; + return kvstore; +} + +nebula::Status initWebService(nebula::WebService* svc, + nebula::kvstore::KVStore* kvstore, + nebula::hdfs::HdfsCommandHelper* helper, + nebula::thread::GenericThreadPool* pool) { + LOG(INFO) << "Starting Meta HTTP Service"; + auto& router = svc->router(); + router.get("/download-dispatch").handler([kvstore, helper, pool](PathParams&&) { + auto handler = new nebula::meta::MetaHttpDownloadHandler(); + handler->init(kvstore, helper, pool); + return handler; + }); + router.get("/ingest-dispatch").handler([kvstore, pool](PathParams&&) { + auto handler = new nebula::meta::MetaHttpIngestHandler(); + handler->init(kvstore, pool); + return handler; + }); + router.get("/replace").handler([kvstore](PathParams&&) { + auto handler = new nebula::meta::MetaHttpReplaceHostHandler(); + handler->init(kvstore); + return handler; + }); +#ifndef BUILD_STANDALONE + return svc->start(); +#else + return svc->start(FLAGS_ws_meta_http_port, FLAGS_ws_meta_h2_port); +#endif +} diff --git a/src/daemons/MetaDaemonInit.h b/src/daemons/MetaDaemonInit.h new file mode 100644 index 00000000000..02dfa6f4eb4 --- /dev/null +++ b/src/daemons/MetaDaemonInit.h @@ -0,0 +1,22 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#pragma once + +#include + +#include "common/base/Status.h" +#include "common/hdfs/HdfsCommandHelper.h" +#include "kvstore/KVStore.h" +#include "webservice/WebService.h" + +nebula::ClusterID& metaClusterId(); + +std::unique_ptr initKV(std::vector peers, + nebula::HostAddr localhost); + +nebula::Status initWebService(nebula::WebService* svc, + nebula::kvstore::KVStore* kvstore, + nebula::hdfs::HdfsCommandHelper* helper, + nebula::thread::GenericThreadPool* pool); diff --git a/src/daemons/StandAloneDaemon.cpp b/src/daemons/StandAloneDaemon.cpp new file mode 100644 index 00000000000..8ea8ea1fcb5 --- /dev/null +++ b/src/daemons/StandAloneDaemon.cpp @@ -0,0 +1,444 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include +#include + +#include +#include +#include +#include + +#include "MetaDaemonInit.h" +#include "common/base/Base.h" +#include "common/base/SignalHandler.h" +#include "common/fs/FileUtils.h" +#include "common/hdfs/HdfsCommandHelper.h" +#include "common/network/NetworkUtils.h" +#include "common/process/ProcessUtils.h" +#include "common/ssl/SSLConfig.h" +#include "common/time/TimezoneInfo.h" +#include "common/utils/MetaKeyUtils.h" +#include "folly/ScopeGuard.h" +#include "graph/service/GraphFlags.h" +#include "graph/service/GraphService.h" +#include "graph/stats/StatsDef.h" +#include "meta/MetaServiceHandler.h" +#include "meta/MetaVersionMan.h" +#include "meta/RootUserMan.h" +#include "meta/http/MetaHttpDownloadHandler.h" +#include "meta/http/MetaHttpIngestHandler.h" +#include "meta/http/MetaHttpReplaceHostHandler.h" +#include "meta/processors/job/JobManager.h" +#include "storage/StorageServer.h" +#include "version/Version.h" +#include "webservice/WebService.h" + +using nebula::fs::FileUtils; +using nebula::graph::GraphService; +using nebula::operator<<; +using nebula::HostAddr; +using nebula::ProcessUtils; +using nebula::Status; +using nebula::StatusOr; +using nebula::network::NetworkUtils; + +void setupThreadManager(); +void printHelp(const char *prog); +void stopAllDaemon(); +static void signalHandler(int sig); +static Status setupSignalHandler(); +extern Status setupLogging(); +#if defined(__x86_64__) +extern Status setupBreakpad(); +#endif + +std::unique_ptr gStorageServer; +static std::unique_ptr gServer; +static std::unique_ptr gMetaServer; +static std::unique_ptr gMetaKVStore; +std::mutex gServerGuard; + +// common flags +DECLARE_string(flagfile); +DECLARE_bool(containerized); +DECLARE_bool(reuse_port); +DECLARE_string(meta_server_addrs); + +// storage gflags +DEFINE_string(data_path, + "", + "Root data path, multi paths should be split by comma." + "For rocksdb engine, one path one instance."); +DEFINE_string(wal_path, + "", + "Nebula wal path. By default, wal will be stored as a sibling of " + "rocksdb data."); +DEFINE_string(listener_path, + "", + "Path for listener, only wal will be saved." + "if it is not empty, data_path will not take effect."); +DECLARE_int32(storage_port); + +// meta gflags +DEFINE_int32(meta_http_thread_num, 3, "Number of meta daemon's http thread"); +DEFINE_int32(meta_port, 45500, "Meta daemon listening port"); + +int main(int argc, char *argv[]) { + google::SetVersionString(nebula::versionString()); + gflags::ParseCommandLineFlags(&argc, &argv, false); + + if (argc == 1) { + printHelp(argv[0]); + return EXIT_FAILURE; + } + if (argc == 2) { + if (::strcmp(argv[1], "-h") == 0) { + printHelp(argv[0]); + return EXIT_SUCCESS; + } + } + + folly::init(&argc, &argv, true); + if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl || FLAGS_enable_meta_ssl) { + folly::ssl::init(); + } + nebula::initCounters(); + + // Setup logging + auto status = setupLogging(); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } + +#if defined(__x86_64__) + status = setupBreakpad(); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } +#endif + + // Detect if the server has already been started + auto pidPath = FLAGS_pid_file; + status = ProcessUtils::isPidAvailable(pidPath); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } + + if (FLAGS_daemonize) { + google::SetStderrLogging(google::FATAL); + } else { + google::SetStderrLogging(google::INFO); + } + + if (FLAGS_daemonize) { + status = ProcessUtils::daemonize(pidPath); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } + } else { + // Write the current pid into the pid file + status = ProcessUtils::makePidFile(pidPath); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } + } + + // Validate the IPv4 address or hostname + status = NetworkUtils::validateHostOrIp(FLAGS_local_ip); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } + + // Initialize the global timezone, it's only used for datetime type compute + // won't affect the process timezone. + status = nebula::time::Timezone::initializeGlobalTimezone(); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } + + // Setup the signal handlers + status = setupSignalHandler(); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } + + if (FLAGS_data_path.empty()) { + LOG(ERROR) << "Storage Data Path should not empty"; + return EXIT_FAILURE; + } + + bool metaReady = false; + int32_t metaRet = EXIT_FAILURE; + std ::unique_ptr metaThread = std::make_unique([&] { + SCOPE_EXIT { + stopAllDaemon(); + }; + nebula::HostAddr metaLocalhost{FLAGS_local_ip, FLAGS_meta_port}; + LOG(INFO) << "metalocalhost = " << metaLocalhost; + auto peersRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); + if (!peersRet.ok()) { + LOG(ERROR) << "Can't get peers address, status:" << peersRet.status(); + return; + } + gMetaKVStore = initKV(peersRet.value(), metaLocalhost); + if (gMetaKVStore == nullptr) { + LOG(ERROR) << "Init kv failed!"; + return; + } + LOG(INFO) << "Start http service"; + auto helper = std::make_unique(); + auto pool = std::make_unique(); + pool->start(FLAGS_meta_http_thread_num, "http thread pool"); + auto webSvc = std::make_unique(); + status = initWebService(webSvc.get(), gMetaKVStore.get(), helper.get(), pool.get()); + if (!status.ok()) { + LOG(ERROR) << "Init web service failed: " << status; + return; + } + + { + nebula::meta::JobManager *jobMgr = nebula::meta::JobManager::getInstance(); + if (!jobMgr->init(gMetaKVStore.get())) { + LOG(ERROR) << "Init job manager failed"; + return; + } + } + + { + /** + * Only leader part needed. + */ + auto ret = gMetaKVStore->partLeader(nebula::kDefaultSpaceId, nebula::kDefaultPartId); + if (!nebula::ok(ret)) { + LOG(ERROR) << "Part leader get failed"; + return; + } + if (nebula::value(ret) == metaLocalhost) { + LOG(INFO) << "Check and init root user"; + if (!nebula::meta::RootUserMan::isUserExists(gMetaKVStore.get())) { + if (!nebula::meta::RootUserMan::initRootUser(gMetaKVStore.get())) { + LOG(ERROR) << "Init root user failed"; + return; + } + } + } + } + + auto handler = + std::make_shared(gMetaKVStore.get(), metaClusterId()); + LOG(INFO) << "The meta deamon start on " << metaLocalhost; + try { + gMetaServer = std::make_unique(); + gMetaServer->setPort(FLAGS_meta_port); + gMetaServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection + gMetaServer->setInterface(std::move(handler)); + if (FLAGS_enable_ssl || FLAGS_enable_meta_ssl) { + gMetaServer->setSSLConfig(nebula::sslContextConfig()); + } + metaReady = true; + gMetaServer->serve(); // Will wait until the server shuts down + } catch (const std::exception &e) { + LOG(ERROR) << "Exception thrown: " << e.what(); + return; + } + + LOG(INFO) << "The meta Daemon stopped"; + metaRet = EXIT_SUCCESS; + return; + }); + + constexpr int metaWaitTimeoutInSec = 15; + constexpr int metaWaitIntervalInSec = 1; + int32_t metaWaitCount = 0; + + while (!metaReady && metaWaitIntervalInSec * metaWaitCount++ < metaWaitTimeoutInSec) { + sleep(metaWaitIntervalInSec); + } + + if (!metaReady) { + LOG(ERROR) << "Meta not ready in time"; + return EXIT_FAILURE; + } + + // start graph server + int32_t graphRet = EXIT_FAILURE; + std ::unique_ptr graphThread = std::make_unique([&] { + SCOPE_EXIT { + stopAllDaemon(); + }; + nebula::HostAddr localhost{FLAGS_local_ip, FLAGS_port}; + LOG(INFO) << "Starting Graph HTTP Service"; + auto webSvc = std::make_unique(); + status = webSvc->start(FLAGS_ws_http_port, FLAGS_ws_h2_port); + if (!status.ok()) { + LOG(WARNING) << "Failed to start graph HTTP service"; + return; + } + + if (FLAGS_num_netio_threads == 0) { + FLAGS_num_netio_threads = std::thread::hardware_concurrency(); + } + if (FLAGS_num_netio_threads <= 0) { + LOG(WARNING) << "Number of networking IO threads should be greater than zero"; + return; + } + LOG(INFO) << "Number of networking IO threads: " << FLAGS_num_netio_threads; + + if (FLAGS_num_worker_threads == 0) { + FLAGS_num_worker_threads = std::thread::hardware_concurrency(); + } + if (FLAGS_num_worker_threads <= 0) { + LOG(WARNING) << "Number of worker threads should be greater than zero"; + return; + } + LOG(INFO) << "Number of worker threads: " << FLAGS_num_worker_threads; + + auto threadFactory = std::make_shared("graph-netio"); + auto ioThreadPool = std::make_shared(FLAGS_num_netio_threads, + std::move(threadFactory)); + gServer = std::make_unique(); + gServer->setIOThreadPool(ioThreadPool); + + auto interface = std::make_shared(); + status = interface->init(ioThreadPool, localhost); + if (!status.ok()) { + LOG(ERROR) << status; + return; + } + + gServer->setPort(localhost.port); + gServer->setInterface(std::move(interface)); + gServer->setReusePort(FLAGS_reuse_port); + gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); + gServer->setNumAcceptThreads(FLAGS_num_accept_threads); + gServer->setListenBacklog(FLAGS_listen_backlog); + if (FLAGS_enable_ssl || FLAGS_enable_graph_ssl) { + gServer->setSSLConfig(nebula::sslContextConfig()); + } + setupThreadManager(); + // Modify two blocking service + FLOG_INFO("Starting nebula-graphd on %s:%d\n", localhost.host.c_str(), localhost.port); + try { + gServer->serve(); // Blocking wait until shut down via gServer->stop() + } catch (const std::exception &e) { + FLOG_ERROR("Exception thrown while starting the RPC server: %s", e.what()); + return; + } + FLOG_INFO("nebula-graphd on %s:%d has been stopped", localhost.host.c_str(), localhost.port); + graphRet = EXIT_SUCCESS; + return; + }); + + int32_t storageRet = EXIT_FAILURE; + std ::unique_ptr storageThread = std::make_unique([&] { + SCOPE_EXIT { + stopAllDaemon(); + }; + HostAddr host(FLAGS_local_ip, FLAGS_storage_port); + LOG(INFO) << "host = " << host; + auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); + if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) { + LOG(ERROR) << "Can't get metaServer address, status:" << metaAddrsRet.status() + << ", FLAGS_meta_server_addrs:" << FLAGS_meta_server_addrs; + return; + } + + std::vector paths; + folly::split(",", FLAGS_data_path, paths, true); + std::transform(paths.begin(), paths.end(), paths.begin(), [](auto &p) { + return folly::trimWhitespace(p).str(); + }); + if (paths.empty()) { + LOG(ERROR) << "Bad data_path format:" << FLAGS_data_path; + return; + } + gStorageServer = std::make_unique( + host, metaAddrsRet.value(), paths, FLAGS_wal_path, FLAGS_listener_path); + if (!gStorageServer->start()) { + LOG(ERROR) << "Storage server start failed"; + gStorageServer->stop(); + return; + } + gStorageServer->waitUntilStop(); + LOG(INFO) << "The storage Daemon stopped"; + storageRet = EXIT_SUCCESS; + return; + }); + + metaThread->join(); + graphThread->join(); + storageThread->join(); + if (metaRet != EXIT_SUCCESS || graphRet != EXIT_SUCCESS || storageRet != EXIT_SUCCESS) { + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} + +Status setupSignalHandler() { + return nebula::SignalHandler::install( + {SIGINT, SIGTERM}, + [](nebula::SignalHandler::GeneralSignalInfo *info) { signalHandler(info->sig()); }); +} + +void stopAllDaemon() { + std::lock_guard guard(gServerGuard); + if (gServer) { + gServer->stop(); + gServer.reset(); + } + if (gStorageServer) { + gStorageServer->stop(); + gStorageServer.reset(); + } + if (gMetaServer) { + gMetaServer->stop(); + gMetaServer.reset(); + } + { + auto gJobMgr = nebula::meta::JobManager::getInstance(); + if (gJobMgr) { + gJobMgr->shutDown(); + } + } + if (gMetaKVStore) { + gMetaKVStore->stop(); + gMetaKVStore.reset(); + } +} + +void signalHandler(int sig) { + switch (sig) { + case SIGINT: + case SIGTERM: + FLOG_INFO("Signal %d(%s) received, stopping this server", sig, ::strsignal(sig)); + stopAllDaemon(); + break; + default: + FLOG_ERROR("Signal %d(%s) received but ignored", sig, ::strsignal(sig)); + } +} + +void printHelp(const char *prog) { + fprintf(stderr, "%s --flagfile \n", prog); +} + +void setupThreadManager() { + int numThreads = + FLAGS_num_worker_threads > 0 ? FLAGS_num_worker_threads : gServer->getNumIOWorkerThreads(); + std::shared_ptr threadManager( + PriorityThreadManager::newPriorityThreadManager(numThreads, false /*stats*/)); + threadManager->setNamePrefix("executor"); + threadManager->start(); + gServer->setThreadManager(threadManager); +} diff --git a/src/graph/service/GraphFlags.cpp b/src/graph/service/GraphFlags.cpp index e31439ff2c3..32fd80f2c14 100644 --- a/src/graph/service/GraphFlags.cpp +++ b/src/graph/service/GraphFlags.cpp @@ -52,7 +52,14 @@ DEFINE_int64(max_allowed_connections, DEFINE_bool(enable_optimizer, false, "Whether to enable optimizer"); +#ifndef BUILD_STANDALONE DEFINE_uint32(ft_request_retry_times, 3, "Retry times if fulltext request failed"); +DEFINE_bool(enable_client_white_list, true, "Turn on/off the client white list."); +DEFINE_string(client_white_list, + nebula::getOriginVersion() + ":2.5.0:2.5.1:2.6.0", + "A white list for different client versions, separate with colon."); + +#endif DEFINE_bool(accept_partial_success, false, "Whether to accept partial success, default false"); @@ -63,11 +70,6 @@ DEFINE_bool(disable_octal_escape_char, DEFINE_bool(enable_experimental_feature, false, "Whether to enable experimental feature"); -DEFINE_bool(enable_client_white_list, true, "Turn on/off the client white list."); -DEFINE_string(client_white_list, - nebula::getOriginVersion() + ":2.5.0:2.5.1:2.6.0", - "A white list for different client versions, separate with colon."); - DEFINE_int32(num_rows_to_check_memory, 1024, "number rows to check memory"); // Sanity-checking Flag Values diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index 8e5129dfb56..82a6e4e2a76 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -11,7 +11,7 @@ DEFINE_int32(listener_commit_interval_secs, 1, "Listener commit interval"); DEFINE_int32(listener_commit_batch_size, 1000, "Max batch size when listener commit"); -DEFINE_int32(ft_request_retry_times, 3, "Retry times if fulltext request failed"); +DEFINE_uint32(ft_request_retry_times, 3, "Retry times if fulltext request failed"); DEFINE_int32(ft_bulk_batch_size, 100, "Max batch size when bulk insert"); DEFINE_int32(listener_pursue_leader_threshold, 1000, "Catch up with the leader's threshold"); diff --git a/src/kvstore/plugins/elasticsearch/ESListener.cpp b/src/kvstore/plugins/elasticsearch/ESListener.cpp index 2d899d61a6c..75b7918eb40 100644 --- a/src/kvstore/plugins/elasticsearch/ESListener.cpp +++ b/src/kvstore/plugins/elasticsearch/ESListener.cpp @@ -8,7 +8,7 @@ #include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" #include "common/utils/NebulaKeyUtils.h" -DECLARE_int32(ft_request_retry_times); +DECLARE_uint32(ft_request_retry_times); DECLARE_int32(ft_bulk_batch_size); namespace nebula { diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index b4b71d961a7..5f7d027e739 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -89,5 +89,11 @@ nebula_add_library( StorageServer.cpp ) + +nebula_add_library( + storage_local_server_obj OBJECT + GraphStorageLocalServer.cpp +) + nebula_add_subdirectory(stats) nebula_add_subdirectory(test) diff --git a/src/storage/GraphStorageLocalServer.cpp b/src/storage/GraphStorageLocalServer.cpp new file mode 100644 index 00000000000..9627d1c4588 --- /dev/null +++ b/src/storage/GraphStorageLocalServer.cpp @@ -0,0 +1,150 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "GraphStorageLocalServer.h" + +#include + +#include "common/base/Base.h" + +#define LOCAL_RETURN_FUTURE(threadManager, respType, callFunc) \ + auto promise = std::make_shared>(); \ + auto f = promise->getFuture(); \ + threadManager->add([&, promise] { \ + handler_->callFunc(request).thenValue( \ + [promise](respType&& resp) { promise->setValue(std::move(resp)); }); \ + }); \ + return f; + +namespace nebula::storage { + +std::mutex mutex_; +std::shared_ptr instance_ = nullptr; + +void GraphStorageLocalServer::setThreadManager( + std::shared_ptr threadManager) { + // lock? + threadManager_ = threadManager; +} + +void GraphStorageLocalServer::setInterface(std::shared_ptr handler) { + handler_ = handler; +} + +void GraphStorageLocalServer::serve() { + if (serving_) { + LOG(WARNING) << "Server already serving"; + return; + } + // do nothing, wait stop + serving_ = true; + sem_.wait(); +} + +void GraphStorageLocalServer::stop() { + if (!serving_) { + LOG(WARNING) << "Can't stop server not serving"; + return; + } + sem_.signal(); + serving_ = false; +} + +folly::Future GraphStorageLocalServer::future_getNeighbors( + const cpp2::GetNeighborsRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetNeighborsResponse, future_getNeighbors); +} + +folly::Future GraphStorageLocalServer::future_addVertices( + const cpp2::AddVerticesRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_addVertices); +} + +folly::Future GraphStorageLocalServer::future_chainAddEdges( + const cpp2::AddEdgesRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_chainAddEdges); +} + +folly::Future GraphStorageLocalServer::future_addEdges( + const cpp2::AddEdgesRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_addEdges); +} + +folly::Future GraphStorageLocalServer::future_getProps( + const cpp2::GetPropRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetPropResponse, future_getProps); +} + +folly::Future GraphStorageLocalServer::future_deleteEdges( + const cpp2::DeleteEdgesRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_deleteEdges); +} + +folly::Future GraphStorageLocalServer::future_deleteVertices( + const cpp2::DeleteVerticesRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_deleteVertices); +} + +folly::Future GraphStorageLocalServer::future_deleteTags( + const cpp2::DeleteTagsRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_deleteTags); +} + +folly::Future GraphStorageLocalServer::future_updateVertex( + const cpp2::UpdateVertexRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, future_updateVertex); +} + +folly::Future GraphStorageLocalServer::future_chainUpdateEdge( + const cpp2::UpdateEdgeRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, future_chainUpdateEdge); +} + +folly::Future GraphStorageLocalServer::future_updateEdge( + const cpp2::UpdateEdgeRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::UpdateResponse, future_updateEdge); +} + +folly::Future GraphStorageLocalServer::future_getUUID( + const cpp2::GetUUIDReq& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetUUIDResp, future_getUUID); +} + +folly::Future GraphStorageLocalServer::future_lookupIndex( + const cpp2::LookupIndexRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::LookupIndexResp, future_lookupIndex); +} + +folly::Future GraphStorageLocalServer::future_lookupAndTraverse( + const cpp2::LookupAndTraverseRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::GetNeighborsResponse, future_lookupAndTraverse); +} + +folly::Future GraphStorageLocalServer::future_scanVertex( + const cpp2::ScanVertexRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ScanResponse, future_scanVertex); +} + +folly::Future GraphStorageLocalServer::future_scanEdge( + const cpp2::ScanEdgeRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ScanResponse, future_scanEdge); +} + +folly::Future GraphStorageLocalServer::future_get( + const cpp2::KVGetRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::KVGetResponse, future_get); +} + +folly::Future GraphStorageLocalServer::future_put( + const cpp2::KVPutRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_put); +} + +folly::Future GraphStorageLocalServer::future_remove( + const cpp2::KVRemoveRequest& request) { + LOCAL_RETURN_FUTURE(threadManager_, cpp2::ExecResponse, future_remove); +} + +} // namespace nebula::storage diff --git a/src/storage/GraphStorageLocalServer.h b/src/storage/GraphStorageLocalServer.h new file mode 100644 index 00000000000..10e27463f7c --- /dev/null +++ b/src/storage/GraphStorageLocalServer.h @@ -0,0 +1,66 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#pragma once + +#include +#include + +#include + +#include "common/base/Base.h" +#include "folly/fibers/Semaphore.h" +#include "interface/gen-cpp2/GraphStorageServiceAsyncClient.h" +#include "storage/GraphStorageServiceHandler.h" + +namespace nebula::storage { +class GraphStorageLocalServer final : public nebula::cpp::NonCopyable, + public nebula::cpp::NonMovable { + public: + static std::shared_ptr getInstance() { + static std::shared_ptr instance{new GraphStorageLocalServer()}; + return instance; + } + void setThreadManager(std::shared_ptr threadManager); + void setInterface(std::shared_ptr iface); + void stop(); + void serve(); + + public: + folly::Future future_getNeighbors( + const cpp2::GetNeighborsRequest& request); + folly::Future future_addVertices(const cpp2::AddVerticesRequest& request); + folly::Future future_chainAddEdges(const cpp2::AddEdgesRequest& request); + folly::Future future_addEdges(const cpp2::AddEdgesRequest& request); + folly::Future future_getProps(const cpp2::GetPropRequest& request); + folly::Future future_deleteEdges(const cpp2::DeleteEdgesRequest& request); + folly::Future future_deleteVertices( + const cpp2::DeleteVerticesRequest& request); + folly::Future future_deleteTags(const cpp2::DeleteTagsRequest& request); + folly::Future future_updateVertex(const cpp2::UpdateVertexRequest& request); + folly::Future future_chainUpdateEdge( + const cpp2::UpdateEdgeRequest& request); + folly::Future future_updateEdge(const cpp2::UpdateEdgeRequest& request); + folly::Future future_getUUID(const cpp2::GetUUIDReq& request); + folly::Future future_lookupIndex(const cpp2::LookupIndexRequest& request); + folly::Future future_lookupAndTraverse( + const cpp2::LookupAndTraverseRequest& request); + folly::Future future_scanVertex(const cpp2::ScanVertexRequest& request); + folly::Future future_scanEdge(const cpp2::ScanEdgeRequest& request); + folly::Future future_get(const cpp2::KVGetRequest& request); + folly::Future future_put(const cpp2::KVPutRequest& request); + folly::Future future_remove(const cpp2::KVRemoveRequest& request); + + private: + GraphStorageLocalServer() = default; + + private: + std::shared_ptr threadManager_; + std::shared_ptr handler_; + folly::fibers::Semaphore sem_{0}; + static std::mutex mutex_; + bool serving_ = {false}; +}; +} // namespace nebula::storage diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index f29fdb490a1..66b697031c3 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -21,6 +21,7 @@ #include "kvstore/RocksEngine.h" #include "storage/BaseProcessor.h" #include "storage/CompactionFilter.h" +#include "storage/GraphStorageLocalServer.h" #include "storage/GraphStorageServiceHandler.h" #include "storage/InternalStorageServiceHandler.h" #include "storage/StorageAdminServiceHandler.h" @@ -35,12 +36,18 @@ #include "webservice/Router.h" #include "webservice/WebService.h" +#ifndef BUILD_STANDALONE DEFINE_int32(port, 44500, "Storage daemon listening port"); -DEFINE_int32(num_io_threads, 16, "Number of IO threads"); DEFINE_int32(num_worker_threads, 32, "Number of workers"); -DEFINE_int32(storage_http_thread_num, 3, "Number of storage daemon's http thread"); DEFINE_bool(local_config, false, "meta client will not retrieve latest configuration from meta"); +#else +DEFINE_int32(storage_port, 44501, "Storage daemon listening port"); +DEFINE_int32(storage_num_worker_threads, 32, "Number of workers"); +DECLARE_bool(local_config); +#endif DEFINE_bool(storage_kv_mode, false, "True for kv mode"); +DEFINE_int32(num_io_threads, 16, "Number of IO threads"); +DEFINE_int32(storage_http_thread_num, 3, "Number of storage daemon's http thread"); namespace nebula { namespace storage { @@ -117,7 +124,11 @@ bool StorageServer::initWebService() { return new storage::StorageHttpPropertyHandler(schemaMan_.get(), kvstore_.get()); }); +#ifndef BUILD_STANDALONE auto status = webSvc_->start(); +#else + auto status = webSvc_->start(FLAGS_ws_storage_http_port, FLAGS_ws_storage_h2_port); +#endif return status.ok(); } @@ -148,8 +159,13 @@ int32_t StorageServer::getAdminStoreSeqId() { bool StorageServer::start() { ioThreadPool_ = std::make_shared(FLAGS_num_io_threads); +#ifndef BUILD_STANDALONE + const int32_t numWorkerThreads = FLAGS_num_worker_threads; +#else + const int32_t numWorkerThreads = FLAGS_storage_num_worker_threads; +#endif workers_ = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager( - FLAGS_num_worker_threads); + numWorkerThreads); workers_->setNamePrefix("executor"); workers_->start(); @@ -222,17 +238,21 @@ bool StorageServer::start() { storageThread_.reset(new std::thread([this] { try { auto handler = std::make_shared(env_.get()); +#ifndef BUILD_STANDALONE storageServer_ = std::make_unique(); storageServer_->setPort(FLAGS_port); storageServer_->setIdleTimeout(std::chrono::seconds(0)); storageServer_->setIOThreadPool(ioThreadPool_); - storageServer_->setThreadManager(workers_); storageServer_->setStopWorkersOnStopListening(false); - storageServer_->setInterface(std::move(handler)); if (FLAGS_enable_ssl) { storageServer_->setSSLConfig(nebula::sslContextConfig()); } +#else + storageServer_ = GraphStorageLocalServer::getInstance(); +#endif + storageServer_->setThreadManager(workers_); + storageServer_->setInterface(std::move(handler)); ServiceStatus expected = STATUS_UNINITIALIZED; if (!storageSvcStatus_.compare_exchange_strong(expected, STATUS_RUNNING)) { LOG(ERROR) << "Impossible! How could it happen!"; diff --git a/src/storage/StorageServer.h b/src/storage/StorageServer.h index 27795c5eaf5..2f907263ef9 100644 --- a/src/storage/StorageServer.h +++ b/src/storage/StorageServer.h @@ -15,6 +15,7 @@ #include "common/meta/SchemaManager.h" #include "kvstore/NebulaStore.h" #include "storage/CommonUtils.h" +#include "storage/GraphStorageLocalServer.h" #include "storage/admin/AdminTaskManager.h" #include "storage/transaction/TransactionManager.h" @@ -64,7 +65,11 @@ class StorageServer final { std::atomic storageSvcStatus_{STATUS_UNINITIALIZED}; std::atomic adminSvcStatus_{STATUS_UNINITIALIZED}; +#ifndef BUILD_STANDALONE std::unique_ptr storageServer_; +#else + std::shared_ptr storageServer_; +#endif std::unique_ptr adminServer_; std::unique_ptr internalStorageThread_; diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 9c0e31c33f0..efc6bdb4d52 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -1,5 +1,9 @@ +# note: standalone version don't have dependent test tools for now. + +if(NOT ENABLE_STANDALONE_VERSION) nebula_add_subdirectory(storage-perf) nebula_add_subdirectory(simple-kv-verify) +endif() nebula_add_subdirectory(meta-dump) nebula_add_subdirectory(db-dump) nebula_add_subdirectory(db-upgrade) diff --git a/src/webservice/WebService.cpp b/src/webservice/WebService.cpp index 196734a8b13..393bc47fe4a 100644 --- a/src/webservice/WebService.cpp +++ b/src/webservice/WebService.cpp @@ -22,6 +22,10 @@ DEFINE_int32(ws_h2_port, 11002, "Port to listen on with HTTP/2 protocol"); DEFINE_string(ws_ip, "0.0.0.0", "IP/Hostname to bind to"); DEFINE_int32(ws_threads, 4, "Number of threads for the web service."); +#ifdef BUILD_STANDALONE +DEFINE_int32(ws_storage_threads, 4, "Number of threads for the web service."); +#endif + namespace nebula { namespace { @@ -57,7 +61,7 @@ WebService::~WebService() { wsThread_->join(); } -Status WebService::start() { +Status WebService::start(uint16_t httpPort, uint16_t h2Port) { if (started_) { LOG(INFO) << "Web service has been started."; return Status::OK(); @@ -87,8 +91,8 @@ Status WebService::start() { started_ = true; std::vector ips = { - {SocketAddress(FLAGS_ws_ip, FLAGS_ws_http_port, true), HTTPServer::Protocol::HTTP}, - {SocketAddress(FLAGS_ws_ip, FLAGS_ws_h2_port, true), HTTPServer::Protocol::HTTP2}, + {SocketAddress(FLAGS_ws_ip, httpPort, true), HTTPServer::Protocol::HTTP}, + {SocketAddress(FLAGS_ws_ip, h2Port, true), HTTPServer::Protocol::HTTP2}, }; CHECK_GT(FLAGS_ws_threads, 0) << "The number of webservice threads must be greater than zero"; diff --git a/src/webservice/WebService.h b/src/webservice/WebService.h index bd34004abd4..a6b760f486c 100644 --- a/src/webservice/WebService.h +++ b/src/webservice/WebService.h @@ -13,6 +13,12 @@ DECLARE_int32(ws_h2_port); DECLARE_string(ws_ip); DECLARE_int32(ws_threads); +#ifdef BUILD_STANDALONE +DECLARE_int32(ws_storage_http_port); +DECLARE_int32(ws_storage_h2_port); +DECLARE_int32(ws_storage_threads); +#endif + namespace proxygen { class HTTPServer; class RequestHandler; @@ -41,7 +47,8 @@ class WebService final { // Two ports would be bound, one for HTTP, another one for HTTP2. // If FLAGS_ws_http_port or FLAGS_ws_h2_port is zero, an ephemeral port // would be assigned and set back to the gflag, respectively. - NG_MUST_USE_RESULT Status start(); + NG_MUST_USE_RESULT Status start(uint16_t httpPort = FLAGS_ws_http_port, + uint16_t h2Port = FLAGS_ws_h2_port); // Check whether web service is started bool started() const {