Dagon manages the lifecycle of all components of a Buffy
topology. The components are booted using a ProcessMonitor from the
process package. Each component is forked off as a separate Pony
runtime process and reports back via TCP with a ready
message to
Dagon’s control socket. We use OSC as our wire protocol for
messages between components.
We define message types that will be used for communication between Dagon and the components it manages.
We start the full topology defined in the ini
file specified by
user. Once we’ve received the ready
message from all booted nodes we
send the start
message to Giles Sender.
msc{
width = 800;
D [label = "Dagon"],
GR [label = "Giles Receiver"],
L [label = "Leader"],
W1 [label = "Worker 1"],
W2 [label = "Worker 2"],
GS [label = "Giles Sender"];
|||;
D -> D [label = "fork Giles Receiver", textcolour = "red"];
D <- GR [label = "ready"];
D -> D [label = "fork Leader", textcolour = "red"];
D <- L [label = "ready"];
D -> D [label = "fork Worker 1", textcolour = "red"];
D <- W1 [label = "ready"];
D -> D [label = "fork Worker 2", textcolour = "red"];
D <- W2 [label = "ready"];
D -> D [label = "fork Giles Sender", textcolour = "red"];
D <- GS [label = "ready"];
D -> D [label = "are_we_ready()"];
|||;
D -> GS [label = "start"];
|||;
}
We expect Giles Sender to complete first and send the done
message
to Dagon. We wait for a specified period of time and then initiate the
shutdown procedure by sending the shutdown
message to the Leader
to give it time for cleanup and any remaining communication with the
workers. We wait for the Leader’s response. Once we receive the
done_shutdown
message from the Leader we send the shutdown
message to all workers, Giles Sender and Giles Receiver and wait
for their response. Once we’ve received done_shutdown
from all
outstanding nodes we are done and exit.
msc{
width = 800;
D [label = "Dagon"],
GR [label = "Giles Receiver"],
L [label = "Leader"],
W1 [label = "Worker 1"],
W2 [label = "Worker 2"],
GS [label = "Giles Sender"];
|||;
D <- GS [label = "done"];
D <- GR [label = "done"];
D -> L [label = "shutdown"];
D <- L [label = "done_shutdown"];
D -> W1 [label = "shutdown"];
D -> W2 [label = "shutdown"];
D <- W1 [label = "done_shutdown"];
D <- W2 [label = "done_shutdown"];
D -> GS [label = "shutdown"];
D <- GS [label = "done_shutdown"];
D -> GR [label = "shutdown"];
D <- GR [label = "done_shutdown"];
|||;
D -> D [label = "are_we_done_yet()"];
D -> D [label = "shutdown_topology()"];
|||;
}
./dagon-pony -f dagon-child/dagon-child -n dagon-child -h 127.0.0.1 -p 8080
let tcp_auth = TCPListenAuth(env.root as AmbientAuth)
let from_buffy_listener = TCPListener(tcp_auth,
FromBuffyListenerNotify(coordinator, store),
listener_addr(0), // ip addr
listener_addr(1)) // port
class FromBuffyListenerNotify is TCPListenNotify
let _coordinator: Coordinator
let _store: Store
new iso create(coordinator: Coordinator, store: Store) =>
_coordinator = coordinator
_store = store
fun ref not_listening(listen: TCPListener ref) =>
_coordinator.from_buffy_listener(listen, Failed)
fun ref listening(listen: TCPListener ref) =>
_coordinator.from_buffy_listener(listen, Ready)
fun ref connected(listen: TCPListener ref): TCPConnectionNotify iso^ =>
FromBuffyNotify(_store)
class FromBuffyNotify is TCPConnectionNotify
let _store: Store
let _framer: Framer = Framer
new iso create(store: Store) =>
_store = store
fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
for chunked in _framer.chunk(consume data).values() do
try
let decoded = WireMsgDecoder(consume chunked)
match decoded
| let d: ExternalMsg val =>
@printf[I32]("%s\n".cstring(), d.data.cstring())
_store.received(d.data, Time.micros())
else
@printf[I32]("UNEXPECTED DATA\n".cstring())
end
else
@printf[I32]("UNABLE TO DECODE MESSAGE\n".cstring())
end
end
true
class MarkusListener is TCPListenNotify
let _all_children: AllChildrenActor
new iso create(ac: AllChildrenActor) =>
_all_children = ac
fun ref connected(listen: TCPListener ref): TCPConnectionNotify iso^ =>
MyChildConnectionNotifier(_all_children)
Once we receive the first ready
message from a child we send an
update to the all_children
actor. Now we know which child is talking
over which connection.
class MyChildConnectionNotifier is TCPConnectionNotify
let _all_children: AllChildrenActor
new iso create(ac: AllChildrenActor)
_all_children = ac
fun ref received(…) =>
// figure out who you are talking to, send _all_children message to update info about that person
for chunked in _framer.chunk(consume data).values() do