Skip to content

Commit

Permalink
Ensure that OmniRouter is kept up to date at RouterRegistry
Browse files Browse the repository at this point in the history
Because of race conditions around when different routers and
components are registered at the RouterRegistry, we need to
ensure that the OmniRouter is kept up to date no matter when
in the sequence it is registered. Previously, the code assumed
that it would be registered first, but this is not a safe
assumption.
  • Loading branch information
jtfmumm authored and nisanharamati committed Mar 27, 2018
1 parent 1ab7c1e commit 2c5de86
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions lib/wallaroo/ent/router_registry/router_registry.pony
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ actor RouterRegistry is InFlightAckRequester
_partition_routers.create()
let _stateless_partition_routers: Map[U128, StatelessPartitionRouter] =
_stateless_partition_routers.create()
let _data_receiver_map: Map[String, DataReceiver] =
_data_receiver_map.create()

var _local_topology_initializer: (LocalTopologyInitializer | None) = None

Expand All @@ -72,7 +74,7 @@ actor RouterRegistry is InFlightAckRequester
//
////////////////

let _sources: SetIs[Source] = _sources.create()
let _sources: Map[StepId, Source] = _sources.create()
let _source_listeners: SetIs[SourceListener] = _source_listeners.create()
// Map from Source digestof value to source id
let _source_ids: Map[U64, StepId] = _source_ids.create()
Expand Down Expand Up @@ -165,7 +167,20 @@ actor RouterRegistry is InFlightAckRequester
_stateless_partition_routers(partition_id) = pr

be set_omni_router(o: OmniRouter) =>
_omni_router = o
_set_omni_router(o)

fun ref _set_omni_router(o: OmniRouter) =>
var omn = o
for (w, dr) in _data_receiver_map.pairs() do
omn = o.add_data_receiver(w, dr)
end
for (w, ob) in _outgoing_boundaries.pairs() do
omn = o.add_boundary(w, ob)
end
for (s_id, s) in _sources.pairs() do
omn = o.add_source(s_id, s)
end
_omni_router = omn

be set_event_log(e: EventLog) =>
_event_log = e
Expand All @@ -180,7 +195,7 @@ actor RouterRegistry is InFlightAckRequester
_connections.register_disposable(d)

be register_source(source: Source, source_id: StepId) =>
_sources.set(source)
_sources(source_id) = source
_source_ids(digestof source) = source_id
if not _stop_the_world_in_process and _application_ready_to_work then
source.unmute(_dummy_consumer)
Expand Down Expand Up @@ -365,12 +380,11 @@ actor RouterRegistry is InFlightAckRequester
end

be register_data_receiver(worker: String, dr: DataReceiver) =>
_data_receiver_map(worker) = dr
match _omni_router
| let omnr: OmniRouter =>
_omni_router = omnr.add_data_receiver(worker, dr)
_distribute_omni_router()
else
Fail()
end

fun _distribute_data_router() =>
Expand Down Expand Up @@ -432,6 +446,15 @@ actor RouterRegistry is InFlightAckRequester
end
_distribute_omni_router()

fun ref _remove_worker(worker: String) =>
try
_data_receiver_map.remove(worker)?
else
Fail()
end
_remove_worker_from_omni_router(worker)
_distribute_boundary_removal(worker)

fun ref _remove_worker_from_omni_router(worker: String) =>
match _omni_router
| let omnr: OmniRouter =>
Expand All @@ -441,8 +464,6 @@ actor RouterRegistry is InFlightAckRequester
_distribute_omni_router()

fun ref _distribute_boundary_removal(worker: String) =>
_remove_worker_from_omni_router(worker)

for subs in _partition_router_subs.values() do
for sub in subs.values() do
match sub
Expand Down Expand Up @@ -530,7 +551,7 @@ actor RouterRegistry is InFlightAckRequester
let obs = consume val obs_trn
let new_omni_router = omni_router_blueprint.build_router(_worker_name,
obs, local_sinks)
_omni_router = new_omni_router
_set_omni_router(new_omni_router)
lti.set_omni_router(new_omni_router)
lti.initialize_join_initializables()

Expand Down Expand Up @@ -1296,7 +1317,7 @@ actor RouterRegistry is InFlightAckRequester
be disconnect_from_leaving_worker(worker: String) =>
_connections.disconnect_from(worker)
try
_distribute_boundary_removal(worker)
_remove_worker(worker)
_outgoing_boundaries.remove(worker)?
_outgoing_boundaries_builders.remove(worker)?
else
Expand Down Expand Up @@ -1383,8 +1404,7 @@ actor RouterRegistry is InFlightAckRequester
=>
match _omni_router
| let o: OmniRouter =>
let new_omni_router = o.update_route_to_proxy(id, proxy_address)
_omni_router = new_omni_router
_omni_router = o.update_route_to_proxy(id, proxy_address)
else
Fail()
end
Expand Down

0 comments on commit 2c5de86

Please sign in to comment.