Skip to content

Commit

Permalink
refactor listener structure (vesoft-inc#4953)
Browse files Browse the repository at this point in the history
Co-authored-by: Yee <[email protected]>
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
3 people authored Nov 29, 2022
1 parent 0ff53a3 commit e1a0283
Show file tree
Hide file tree
Showing 16 changed files with 91 additions and 40 deletions.
1 change: 1 addition & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ set(storage_meta_deps
$<TARGET_OBJECTS:storage_client_stats_obj>
$<TARGET_OBJECTS:kv_stats_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:listener_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:disk_man_obj>
$<TARGET_OBJECTS:keyutils_obj>
Expand Down
3 changes: 1 addition & 2 deletions src/kvstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
nebula_add_library(
kvstore_obj OBJECT
Part.cpp
Listener.cpp
RocksEngine.cpp
PartManager.cpp
NebulaStore.cpp
RocksEngineConfig.cpp
NebulaSnapshotManager.cpp
RateLimiter.cpp
plugins/elasticsearch/ESListener.cpp
)

nebula_add_library(
Expand All @@ -21,6 +19,7 @@ nebula_add_library(
)

nebula_add_subdirectory(raftex)
nebula_add_subdirectory(listener)
nebula_add_subdirectory(wal)
nebula_add_subdirectory(stats)
nebula_add_subdirectory(test)
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "common/utils/NebulaKeyUtils.h"
#include "kvstore/NebulaSnapshotManager.h"
#include "kvstore/RocksEngine.h"
#include "kvstore/plugins/elasticsearch/ESListener.h"
#include "kvstore/listener/elasticsearch/ESListener.h"

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs,
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
#include "kvstore/DiskManager.h"
#include "kvstore/KVEngine.h"
#include "kvstore/KVStore.h"
#include "kvstore/Listener.h"
#include "kvstore/Part.h"
#include "kvstore/PartManager.h"
#include "kvstore/listener/Listener.h"
#include "kvstore/raftex/RaftexService.h"
#include "kvstore/raftex/SnapshotManager.h"

Expand Down
11 changes: 11 additions & 0 deletions src/kvstore/listener/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) 2022 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.

nebula_add_library(
listener_obj OBJECT
Listener.cpp
elasticsearch/ESListener.cpp
)

nebula_add_subdirectory(test)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* This source code is licensed under Apache 2.0 License.
*/

#include "kvstore/Listener.h"
#include "kvstore/listener/Listener.h"

#include "codec/RowReaderWrapper.h"
#include "common/time/WallClock.h"
Expand Down Expand Up @@ -155,8 +155,6 @@ void Listener::doApply() {
});
}



void Listener::resetListener() {
std::lock_guard<std::mutex> g(raftLock_);
reset();
Expand Down
7 changes: 3 additions & 4 deletions src/kvstore/Listener.h → src/kvstore/listener/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* This source code is licensed under Apache 2.0 License.
*/

#ifndef KVSTORE_LISTENER_H_
#define KVSTORE_LISTENER_H_
#ifndef KVSTORE_LISTENER_LISTENER_H_
#define KVSTORE_LISTENER_LISTENER_H_

#include "common/base/Base.h"
#include "common/meta/SchemaManager.h"
Expand Down Expand Up @@ -274,10 +274,9 @@ class Listener : public raftex::RaftPart {
protected:
LogID leaderCommitId_ = 0;
LogID lastApplyLogId_ = 0;
int64_t lastApplyTime_ = 0;
std::set<HostAddr> peers_;
};

} // namespace kvstore
} // namespace nebula
#endif // KVSTORE_LISTENER_H_
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* This source code is licensed under Apache 2.0 License.
*/

#include "kvstore/plugins/elasticsearch/ESListener.h"
#include "kvstore/listener/elasticsearch/ESListener.h"

#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h"
#include "common/utils/NebulaKeyUtils.h"
Expand Down Expand Up @@ -321,7 +321,6 @@ void ESListener::processLogs() {
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
}

Expand Down Expand Up @@ -351,7 +350,6 @@ std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> ESListener::commitSnapshot
leaderCommitId_ = committedLogId;
lastApplyLogId_ = committedLogId;
persist(committedLogId, committedLogTerm, lastApplyLogId_);
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
LOG(INFO) << folly::sformat(
"Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
* This source code is licensed under Apache 2.0 License.
*/

#ifndef KVSTORE_PLUGINS_ES_LISTENER_H_
#define KVSTORE_PLUGINS_ES_LISTENER_H_
#ifndef KVSTORE_LISTENER_ES_LISTENER_H_
#define KVSTORE_LISTENER_ES_LISTENER_H_

#include "codec/RowReaderWrapper.h"
#include "common/plugin/fulltext/FTStorageAdapter.h"
#include "kvstore/Listener.h"
#include "kvstore/listener/Listener.h"

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -171,4 +171,4 @@ class ESListener : public Listener {

} // namespace kvstore
} // namespace nebula
#endif // KVSTORE_PLUGINS_ES_LISTENER_H_
#endif
63 changes: 63 additions & 0 deletions src/kvstore/listener/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) 2022 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.

set(LISTENER_TEST_LIBS
$<TARGET_OBJECTS:listener_obj>
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:disk_man_obj>
$<TARGET_OBJECTS:keyutils_obj>
$<TARGET_OBJECTS:meta_keyutils_obj>
$<TARGET_OBJECTS:log_str_list_iterator_obj>
$<TARGET_OBJECTS:codec_obj>
$<TARGET_OBJECTS:meta_obj>
$<TARGET_OBJECTS:meta_client_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:raftex_thrift_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:datatypes_obj>
$<TARGET_OBJECTS:conf_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:file_based_cluster_id_man_obj>
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:expression_obj>
$<TARGET_OBJECTS:ast_match_path_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:ft_es_storage_adapter_obj>
$<TARGET_OBJECTS:version_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:geo_index_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
$<TARGET_OBJECTS:storage_client_stats_obj>
$<TARGET_OBJECTS:kv_stats_obj>
)

nebula_add_test(
NAME
nebula_listener_test
SOURCES
NebulaListenerTest.cpp
OBJECTS
${LISTENER_TEST_LIBS}
LIBRARIES
${THRIFT_LIBRARIES}
${ROCKSDB_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ class DummyListener : public Listener {
leaderCommitId_ = committedLogId;
lastApplyLogId_ = committedLogId;
persist(committedLogId, committedLogTerm, lastApplyLogId_);
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
LOG(INFO) << folly::sformat(
"Commit snapshot to : committedLogId={},"
"committedLogTerm={}, lastApplyLogId_={}",
Expand Down Expand Up @@ -179,7 +178,6 @@ class DummyListener : public Listener {
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
}

Expand Down
5 changes: 0 additions & 5 deletions src/kvstore/plugins/CMakeLists.txt

This file was deleted.

16 changes: 1 addition & 15 deletions src/kvstore/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
set(KVSTORE_TEST_LIBS
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:listener_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:disk_man_obj>
$<TARGET_OBJECTS:keyutils_obj>
Expand Down Expand Up @@ -107,21 +108,6 @@ nebula_add_test(
gtest
)

nebula_add_test(
NAME
nebula_listener_test
SOURCES
NebulaListenerTest.cpp
OBJECTS
${KVSTORE_TEST_LIBS}
LIBRARIES
${THRIFT_LIBRARIES}
${ROCKSDB_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)

nebula_add_test(
NAME
rocks_engine_config_test
Expand Down
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ set(meta_test_deps
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:listener_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:disk_man_obj>
$<TARGET_OBJECTS:keyutils_obj>
Expand Down
1 change: 1 addition & 0 deletions src/storage/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(storage_test_deps
$<TARGET_OBJECTS:internal_storage_client_obj>
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:listener_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:disk_man_obj>
$<TARGET_OBJECTS:keyutils_obj>
Expand Down
1 change: 1 addition & 0 deletions src/tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ set(tools_test_deps
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:listener_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:disk_man_obj>
$<TARGET_OBJECTS:codec_obj>
Expand Down

0 comments on commit e1a0283

Please sign in to comment.