Skip to content

Commit

Permalink
fix: configurator: apply app ids (bloomberg#452)
Browse files Browse the repository at this point in the history
Signed-off-by: Jean-Louis Leroy <[email protected]>
  • Loading branch information
jll63 authored Oct 9, 2024
1 parent cbc13da commit 0c88d8c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
16 changes: 9 additions & 7 deletions src/python/blazingmq/dev/configurator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,27 +216,25 @@ def domain(self, parameters: mqbconf.Domain) -> "Domain":
return self._add_domain(Domain(self, domain))

def broadcast_domain(self, name: str) -> "Domain":
parameters = self.configurator.domain_definition()
parameters = self.configurator.broadcast_domain()
parameters.name = name
parameters.mode = mqbconf.QueueMode(broadcast=mqbconf.QueueModeBroadcast())
parameters.storage.config.in_memory = mqbconf.InMemoryStorage()
parameters.storage.config.file_backed = None
domain = mqbconf.DomainDefinition(self.name, parameters)

return self._add_domain(Domain(self, domain))

def fanout_domain(self, name: str, app_ids: List[str]) -> "Domain":
parameters = self.configurator.domain_definition()
parameters = self.configurator.fanout_domain()
parameters.name = name
parameters.mode = mqbconf.QueueMode(fanout=mqbconf.QueueModeFanout([*app_ids]))
parameters.mode.fanout.app_ids = app_ids.copy()
domain = mqbconf.DomainDefinition(self.name, parameters)

return self._add_domain(Domain(self, domain))

def priority_domain(self, name: str) -> "Domain":
parameters = self.configurator.domain_definition()
parameters = self.configurator.priority_domain()
parameters.name = name
parameters.mode = mqbconf.QueueMode(priority=mqbconf.QueueModePriority())
domain = mqbconf.DomainDefinition(self.name, parameters)

return self._add_domain(Domain(self, domain))
Expand Down Expand Up @@ -314,6 +312,11 @@ class Proto:
domain: mqbconf.Domain = field(
default_factory=functools.partial(
mqbconf.Domain,
mode=mqbconf.QueueMode(
broadcast=mqbconf.QueueModeBroadcast(),
fanout=mqbconf.QueueModeFanout(),
priority=mqbconf.QueueModePriority(),
),
max_delivery_attempts=0,
deduplication_time_ms=300000,
consistency=mqbconf.Consistency(strong=mqbconf.QueueConsistencyStrong()),
Expand All @@ -338,7 +341,6 @@ class Proto:
max_consumers=0,
max_queues=0,
max_idle_time=0,
mode=None, # overwritten
)
)

Expand Down
28 changes: 27 additions & 1 deletion src/python/blazingmq/dev/configurator/configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,33 @@ def virtual_cluster_definition(self):
return copy.deepcopy(self.proto.virtual_cluster)

def domain_definition(self):
return copy.deepcopy(self.proto.domain)
domain = copy.deepcopy(self.proto.domain)
domain.mode.broadcast = None
domain.mode.fanout = None
domain.mode.priority = None

return domain

def broadcast_domain(self):
domain = copy.deepcopy(self.proto.domain)
domain.mode.fanout = None
domain.mode.priority = None

return domain

def fanout_domain(self):
domain = copy.deepcopy(self.proto.domain)
domain.mode.broadcast = None
domain.mode.priority = None

return domain

def priority_domain(self):
domain = copy.deepcopy(self.proto.domain)
domain.mode.broadcast = None
domain.mode.fanout = None

return domain

def broker(
self,
Expand Down

0 comments on commit 0c88d8c

Please sign in to comment.