Skip to content

Latest commit

 

History

History
221 lines (191 loc) · 6.66 KB

dagon.org

File metadata and controls

221 lines (191 loc) · 6.66 KB

Dagon in Pony

Introduction

Dagon manages the lifecycle of all components of a Buffy topology. The components are booted using a ProcessMonitor from the process package. Each component is forked off as a separate Pony runtime process and reports back via TCP with a ready message to Dagon’s control socket. We use OSC as our wire protocol for messages between components.

Design

Control Messages

We define message types that will be used for communication between Dagon and the components it manages.

Boot Topology

We start the full topology defined in the ini file specified by user. Once we’ve received the ready message from all booted nodes we send the start message to Giles Sender.

msc{
  width = 800;

  D  [label = "Dagon"],
  GR [label = "Giles Receiver"],
  L  [label = "Leader"],
  W1 [label = "Worker 1"],
  W2 [label = "Worker 2"],
  GS [label = "Giles Sender"];
  |||;  
  D -> D   [label = "fork Giles Receiver", textcolour = "red"];
  D <- GR  [label = "ready"];
  D -> D   [label = "fork Leader", textcolour = "red"];
  D <- L   [label = "ready"];
  D -> D   [label = "fork Worker 1", textcolour = "red"];
  D <- W1  [label = "ready"];
  D -> D   [label = "fork Worker 2", textcolour = "red"];
  D <- W2  [label = "ready"];
  D -> D   [label = "fork Giles Sender", textcolour = "red"];
  D <- GS  [label = "ready"];
  D -> D   [label = "are_we_ready()"];
  |||;  
  D -> GS  [label = "start"];
  |||;  
}

./dagon-sequence-1.png

Shutdown Topology

We expect Giles Sender to complete first and send the done message to Dagon. We wait for a specified period of time and then initiate the shutdown procedure by sending the shutdown message to the Leader to give it time for cleanup and any remaining communication with the workers. We wait for the Leader’s response. Once we receive the done_shutdown message from the Leader we send the shutdown message to all workers, Giles Sender and Giles Receiver and wait for their response. Once we’ve received done_shutdown from all outstanding nodes we are done and exit.

msc{
  width = 800;

  D  [label = "Dagon"],
  GR [label = "Giles Receiver"],
  L  [label = "Leader"],
  W1 [label = "Worker 1"],
  W2 [label = "Worker 2"],
  GS [label = "Giles Sender"];
  |||;  
  D <- GS  [label = "done"];
  D <- GR  [label = "done"];
  D -> L   [label = "shutdown"];
  D <- L   [label = "done_shutdown"];
  D -> W1  [label = "shutdown"];
  D -> W2  [label = "shutdown"];
  D <- W1  [label = "done_shutdown"];
  D <- W2  [label = "done_shutdown"];
  D -> GS  [label = "shutdown"];
  D <- GS  [label = "done_shutdown"];  
  D -> GR  [label = "shutdown"];
  D <- GR  [label = "done_shutdown"];
  |||;  
  D -> D   [label = "are_we_done_yet()"];
  D -> D   [label = "shutdown_topology()"];
  |||;  
}

./dagon-sequence-2.png

First Draft

Notes

Dagon Calls

./dagon-pony -f dagon-child/dagon-child -n dagon-child -h 127.0.0.1 -p 8080

TCPListener

Listen on a specific port

let tcp_auth = TCPListenAuth(env.root as AmbientAuth)
let from_buffy_listener = TCPListener(tcp_auth,
  FromBuffyListenerNotify(coordinator, store),
  listener_addr(0), // ip addr
  listener_addr(1)) // port

A Notifier

class FromBuffyListenerNotify is TCPListenNotify
let _coordinator: Coordinator
let _store: Store

new iso create(coordinator: Coordinator, store: Store) =>
  _coordinator = coordinator
  _store = store

fun ref not_listening(listen: TCPListener ref) =>
  _coordinator.from_buffy_listener(listen, Failed)

fun ref listening(listen: TCPListener ref) =>
  _coordinator.from_buffy_listener(listen, Ready)

fun ref connected(listen: TCPListener ref): TCPConnectionNotify iso^ =>
  FromBuffyNotify(_store)

Decode Incoming Data

class FromBuffyNotify is TCPConnectionNotify
  let _store: Store
  let _framer: Framer = Framer

  new iso create(store: Store) =>
    _store = store

  fun ref received(conn: TCPConnection ref, data: Array[U8] iso): Bool =>
    for chunked in _framer.chunk(consume data).values() do
      try
        let decoded = WireMsgDecoder(consume chunked)
        match decoded
        | let d: ExternalMsg val =>
          @printf[I32]("%s\n".cstring(), d.data.cstring())
          _store.received(d.data, Time.micros())
        else
          @printf[I32]("UNEXPECTED DATA\n".cstring())
        end
      else
        @printf[I32]("UNABLE TO DECODE MESSAGE\n".cstring())
      end
    end
    true

Actor as Child Roster

class MarkusListener is TCPListenNotify
  let _all_children: AllChildrenActor

  new iso create(ac: AllChildrenActor) =>
    _all_children = ac

  fun ref connected(listen: TCPListener ref): TCPConnectionNotify iso^ =>
    MyChildConnectionNotifier(_all_children)

Map a Connection to Child

Once we receive the first ready message from a child we send an update to the all_children actor. Now we know which child is talking over which connection.

class MyChildConnectionNotifier is TCPConnectionNotify
  let _all_children: AllChildrenActor

  new iso create(ac: AllChildrenActor)
    _all_children = ac

  fun ref received(…) =>
    // figure out who you are talking to, send _all_children message to update info about that person

Read Chunked Data

for chunked in _framer.chunk(consume data).values() do

Code Review #1