Skip to content

Commit

Permalink
Feat[MQB]: Monolithic Virtual Storage (bloomberg#334)
Browse files Browse the repository at this point in the history
* WIP

Signed-off-by: dorjesinpo <[email protected]>

* Monolithic Virtual Storage

Signed-off-by: dorjesinpo <[email protected]>

* formatting

Signed-off-by: dorjesinpo <[email protected]>

* addressing reviews

Signed-off-by: dorjesinpo <[email protected]>

* merge

Signed-off-by: dorjesinpo <[email protected]>

* addressing review

Signed-off-by: dorjesinpo <[email protected]>

* adding UT

Signed-off-by: dorjesinpo <[email protected]>

* merge

Signed-off-by: dorjesinpo <[email protected]>

* addressing review

Signed-off-by: dorjesinpo <[email protected]>

* Rebasing

Signed-off-by: dorjesinpo <[email protected]>

* QE::register/unregisterStorage callbacks

Signed-off-by: dorjesinpo <[email protected]>

* fixing UTs

Signed-off-by: dorjesinpo <[email protected]>

* Cleaning log

Signed-off-by: dorjesinpo <[email protected]>

* fixing UTs

Signed-off-by: dorjesinpo <[email protected]>

---------

Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored Oct 1, 2024
1 parent 136ec3d commit f7067f8
Show file tree
Hide file tree
Showing 60 changed files with 4,802 additions and 4,432 deletions.
12 changes: 9 additions & 3 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <mqba_domainmanager.h>
#include <mqba_sessionnegotiator.h>
#include <mqbblp_clustercatalog.h>
#include <mqbblp_relayqueueengine.h>
#include <mqbcfg_brokerconfig.h>
#include <mqbcfg_messages.h>
#include <mqbcmd_commandlist.h>
Expand Down Expand Up @@ -159,6 +160,8 @@ Application::Application(bdlmt::EventScheduler* scheduler,
bdlf::PlaceHolders::_2), // allocator
k_BLOB_POOL_GROWTH_STRATEGY,
d_allocators.get("BlobSpPool"))
, d_pushElementsPool(sizeof(mqbblp::PushStream::Element),
d_allocators.get("PushElementsPool"))
, d_allocatorsStatContext_p(allocatorsStatContext)
, d_pluginManager_mp()
, d_statController_mp()
Expand Down Expand Up @@ -269,6 +272,11 @@ int Application::start(bsl::ostream& errorDescription)
}
}

mqbi::ClusterResources resources(d_scheduler_p,
&d_bufferFactory,
&d_blobSpPool,
&d_pushElementsPool);

// Start the StatController
d_statController_mp.load(
new (*d_allocator_p) mqbstat::StatController(
Expand Down Expand Up @@ -355,12 +363,10 @@ int Application::start(bsl::ostream& errorDescription)

// Start the ClusterCatalog
d_clusterCatalog_mp.load(new (*d_allocator_p) mqbblp::ClusterCatalog(
d_scheduler_p,
d_dispatcher_mp.get(),
d_transportManager_mp.get(),
statContextsMap,
&d_bufferFactory,
&d_blobSpPool,
resources,
d_allocators.get("ClusterCatalog")),
d_allocator_p);

Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqba/mqba_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class Application {

BlobSpPool d_blobSpPool;

bdlma::ConcurrentPool d_pushElementsPool;

mwcst::StatContext* d_allocatorsStatContext_p;
// Stat context of the counting allocators,
// if used
Expand Down
11 changes: 4 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2537,22 +2537,18 @@ Cluster::Cluster(const bslstl::StringRef& name,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
mqbi::DomainFactory* domainFactory,
bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
mqbnet::TransportManager* transportManager,
StopRequestManagerType* stopRequestsManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator,
const mqbnet::Session::AdminCommandEnqueueCb& adminCb)
: d_allocator_p(allocator)
, d_allocators(d_allocator_p)
, d_isStarted(false)
, d_isStopping(false)
, d_clusterData(name,
scheduler,
bufferFactory,
blobSpPool,
resources,
clusterConfig,
mqbcfg::ClusterProxyDefinition(allocator),
netCluster,
Expand Down Expand Up @@ -2588,7 +2584,8 @@ Cluster::Cluster(const bslstl::StringRef& name,
BSLS_ASSERT(d_allocator_p);
mqbnet::Cluster* netCluster_p = d_clusterData.membership().netCluster();
BSLS_ASSERT(netCluster_p && "NetCluster not set !");
BSLS_ASSERT(scheduler->clockType() == bsls::SystemClockType::e_MONOTONIC);
BSLS_ASSERT(resources.scheduler()->clockType() ==
bsls::SystemClockType::e_MONOTONIC);
BSLS_ASSERT_SAFE(d_clusterData.membership().selfNode() &&
"SelfNode not found in cluster!");

Expand Down
18 changes: 8 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,17 +467,17 @@ class Cluster : public mqbi::Cluster,
const bmqp::QueueId& queueId,
mqbc::ClusterNodeSession* ns,
bmqp::EventType::Enum eventType);
// Validate a message of the specified 'eventType' using the specified
// 'queueId' and 'ns'. Return one of `ValidationResult` values. Populate
// the specified 'queueHandle' if the queue is found.
// Validate a message of the specified `eventType` using the specified
// `queueId` and `ns`. Return one of `ValidationResult` values. Populate
// the specified `queueHandle` if the queue is found.

bool validateRelayMessage(mqbc::ClusterNodeSession** ns,
bsl::ostream* errorStream,
const int pid);
// Validate a relay message using the specified 'pid'. Return true if the
// message is valid and false otherwise. Populate the specified 'ns' if the
// Validate a relay message using the specified `pid`. Return true if the
// message is valid and false otherwise. Populate the specified `ns` if the
// message is valid or load a descriptive error message into the
// 'errorStream' if the message is invalid.
// `errorStream` if the message is invalid.

/// Executes in any thread.
void
Expand Down Expand Up @@ -541,19 +541,17 @@ class Cluster : public mqbi::Cluster,
/// Create a new object representing a cluster having the specified
/// `name`, `clusterConfig` and `statContexts`, associated to the
/// specified `netCluster` and using the specified `domainFactory`,
/// `scheduler`, `dispatcher`, `blobSpPool` and `bufferFactory`. Use
/// `scheduler`, `dispatcher`, `transportManager`, and `resources`. Use
/// the specified `allocator` for any memory allocation.
Cluster(const bslstl::StringRef& name,
const mqbcfg::ClusterDefinition& clusterConfig,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
mqbi::DomainFactory* domainFactory,
bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
mqbnet::TransportManager* transportManager,
StopRequestManagerType* stopRequestsManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator,
const mqbnet::Session::AdminCommandEnqueueCb& adminCb);

Expand Down
30 changes: 11 additions & 19 deletions src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,10 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription,
netCluster,
d_statContexts,
d_domainFactory_p,
d_scheduler_p,
d_dispatcher_p,
d_blobSpPool_p,
d_bufferFactory_p,
d_transportManager_p,
&d_stopRequestsManager,
d_resources,
clusterAllocator,
d_adminCb);

Expand Down Expand Up @@ -226,12 +224,10 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription,
clusterProxyDefinition,
netCluster,
d_statContexts,
d_scheduler_p,
d_bufferFactory_p,
d_blobSpPool_p,
d_dispatcher_p,
d_transportManager_p,
&d_stopRequestsManager,
d_resources,
clusterAllocator);

info.d_cluster_sp.reset(cluster, clusterAllocator);
Expand Down Expand Up @@ -362,20 +358,15 @@ int ClusterCatalog::initiateReversedClusterConnectionsImp(
return rc;
}

ClusterCatalog::ClusterCatalog(bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
bslma::Allocator* allocator)
ClusterCatalog::ClusterCatalog(mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_allocators(d_allocator_p)
, d_isStarted(false)
, d_scheduler_p(scheduler)
, d_dispatcher_p(dispatcher)
, d_bufferFactory_p(bufferFactory)
, d_blobSpPool_p(blobSpPool)
, d_transportManager_p(transportManager)
, d_domainFactory_p(0)
, d_clustersDefinition(d_allocator_p)
Expand All @@ -386,14 +377,15 @@ ClusterCatalog::ClusterCatalog(bdlmt::EventScheduler* scheduler,
, d_clusters(d_allocator_p)
, d_statContexts(statContexts)
, d_requestManager(bmqp::EventType::e_CONTROL,
d_bufferFactory_p,
d_scheduler_p,
resources.bufferFactory(),
resources.scheduler(),
false, // lateResponseMode
d_allocator_p)
, d_stopRequestsManager(&d_requestManager, d_allocator_p)
, d_resources(resources)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(scheduler->clockType() ==
BSLS_ASSERT_SAFE(d_resources.scheduler()->clockType() ==
bsls::SystemClockType::e_MONOTONIC);
}

Expand Down
40 changes: 10 additions & 30 deletions src/groups/mqb/mqbblp/mqbblp_clustercatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,6 @@ class ClusterCatalog {
// FRIENDS
friend class ClusterCatalogIterator;

public:
// TYPES

/// Pool of shared pointers to Blobs
typedef bdlcc::SharedObjectPool<
bdlbb::Blob,
bdlcc::ObjectPoolFunctors::DefaultCreator,
bdlcc::ObjectPoolFunctors::RemoveAll<bdlbb::Blob> >
BlobSpPool;

public:
// TYPES

Expand Down Expand Up @@ -225,19 +215,9 @@ class ClusterCatalog {
bool d_isStarted;
// True if this component is started

bdlmt::EventScheduler* d_scheduler_p;
// EventScheduler to use

mqbi::Dispatcher* d_dispatcher_p;
// Dispatcher to use

bdlbb::BlobBufferFactory* d_bufferFactory_p;
// Blob buffer factory to use

BlobSpPool* d_blobSpPool_p;
// Pool of shared pointers to blob to
// use.

mqbnet::TransportManager* d_transportManager_p;
// TransportManager for creating
// mqbnet::Cluster
Expand Down Expand Up @@ -290,6 +270,8 @@ class ClusterCatalog {
StatContextsMap d_statContexts;
// Map of stat contexts

const mqbi::ClusterResources d_resources;

mqbnet::Session::AdminCommandEnqueueCb d_adminCb;
// Callback function to enqueue admin commands

Expand Down Expand Up @@ -352,16 +334,14 @@ class ClusterCatalog {

// CREATORS

/// Create a new object using the specified `scheduler`, `dispatcher`,
/// `transportManager`, `bufferFactory`, `blobSpPool`, and the specified
/// `allocator`.
ClusterCatalog(bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
bslma::Allocator* allocator);
/// Create a new object using the specified 'dispatcher',
/// 'transportManager', 'statContexts', 'resources', and the specified
/// 'allocator'.
ClusterCatalog(mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator);

/// Destructor.
~ClusterCatalog();
Expand Down
10 changes: 3 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1047,20 +1047,16 @@ ClusterProxy::ClusterProxy(
const mqbcfg::ClusterProxyDefinition& clusterProxyConfig,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
bdlmt::EventScheduler* scheduler,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
StopRequestManagerType* stopRequestsManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_isStarted(false)
, d_isStopping(false)
, d_clusterData(name,
scheduler,
bufferFactory,
blobSpPool,
resources,
mqbcfg::ClusterDefinition(allocator),
clusterProxyConfig,
netCluster,
Expand Down Expand Up @@ -1088,7 +1084,7 @@ ClusterProxy::ClusterProxy(
// PRECONDITIONS
mqbnet::Cluster* netCluster_p = d_clusterData.membership().netCluster();
BSLS_ASSERT_SAFE(netCluster_p && "NetCluster not set !");
BSLS_ASSERT_SAFE(scheduler->clockType() ==
BSLS_ASSERT_SAFE(resources.scheduler()->clockType() ==
bsls::SystemClockType::e_MONOTONIC);

d_clusterData.clusterConfig().queueOperations() =
Expand Down
19 changes: 7 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,8 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
mqbnet::ClusterNode* node,
BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::ClientIdentity& identity);

// Executed by the dispatcher thread when there is a change in the
// specified 'node' connectivity. If not empty, the specified
// 'session' points to newly connected 'Session'. Empty 'session'
// indicates loss of connectivity.

/// Executed by the dispatcher thread when the specified `node` becomes
/// unavailable.
void onNodeDownDispatched(mqbnet::ClusterNode* node);

/// Callback method when the `activeNodeLookupEvent` has expired.
Expand Down Expand Up @@ -429,20 +426,18 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
// CREATORS

/// Create a new object representing a cluster having the specified
/// `name`, `clusterProxyConfig` and `statContexts`, associated to the
/// specified `netCluster` and using the specified `scheduler`,
/// `bufferFactory`, `blobSpPool` and `dispatcher`. Use the specified
/// `allocator` for any memory allocation.
/// `name`, `clusterConfig` and `statContexts`, associated to the
/// specified `netCluster` and using the specified `domainFactory`,
/// `scheduler`, `dispatcher`, `transportManager`, and `resources`. Use
/// the specified `allocator` for any memory allocation.
ClusterProxy(const bslstl::StringRef& name,
const mqbcfg::ClusterProxyDefinition& clusterProxyConfig,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
bdlmt::EventScheduler* scheduler,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
StopRequestManagerType* stopRequestsManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator);

/// Destructor
Expand Down
Loading

0 comments on commit f7067f8

Please sign in to comment.