Skip to content

Commit

Permalink
Add state callback and test
Browse files Browse the repository at this point in the history
  • Loading branch information
fjordan committed Sep 2, 2020
1 parent d061fa3 commit b75ccf3
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 0 deletions.
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,12 @@ type Config struct {
ProgressCallback HTTPCallback
ProgressReportFrequency int

// Report state via an HTTP callback. The Payload field of the callback
// will be sent to the server as the CustomPayload field in the State
// struct The unit of StateReportFrequency is in milliseconds.
StateCallback HTTPCallback
StateReportFrequency int

// The state to resume from as dumped by the PanicErrorHandler.
// If this is null, a new Ghostferry run will be started. Otherwise, the
// reconciliation process will start and Ghostferry will resume after that.
Expand Down
33 changes: 33 additions & 0 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,24 @@ func (f *Ferry) Run() {
}()
}

if f.Config.StateCallback.URI != "" {
supportingServicesWg.Add(1)
go func() {
defer supportingServicesWg.Done()

frequency := time.Duration(f.Config.StateReportFrequency) * time.Millisecond

for {
select {
case <-ctx.Done():
return
case <-time.After(frequency):
f.ReportState()
}
}
}()
}

if f.DumpStateOnSignal {
go func() {
c := make(chan os.Signal, 1)
Expand Down Expand Up @@ -890,6 +908,21 @@ func (f *Ferry) ReportProgress() {
}
}

func (f *Ferry) ReportState() {
callback := f.Config.StateCallback
state, err := f.SerializeStateToJSON()
if err != nil {
f.logger.WithError(err).Error("failed to dump state to JSON")
return
}

callback.Payload = string(state)
err = callback.Post(&http.Client{})
if err != nil {
f.logger.WithError(err).Errorf("failed to post state to callback: %s", callback)
}
}

func (f *Ferry) waitUntilAutomaticCutoverIsTrue() {
for !f.AutomaticCutover {
time.Sleep(1 * time.Second)
Expand Down
13 changes: 13 additions & 0 deletions test/helpers/ghostferry_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,19 @@ def start_server
end
end

@server.mount_proc "/callbacks/state" do |req, resp|
begin
unless req.body
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data")
resp.status = 400
@server.shutdown
end
data = JSON.parse(JSON.parse(req.body)["Payload"])
@callback_handlers["state"].each { |f| f.call(data) } unless @callback_handlers["state"].nil?
rescue StandardError
end
end

@server.mount_proc "/callbacks/error" do |req, resp|
@error = JSON.parse(JSON.parse(req.body)["Payload"])
end
Expand Down
15 changes: 15 additions & 0 deletions test/integration/callbacks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,19 @@ def test_progress_callback
refute progress.last["ETA"].nil?
assert progress.last["TimeTaken"] > 0
end

def test_state_callback
seed_simple_database_with_single_table

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })
state = []
ghostferry.on_callback("state") do |state_data|
state << state_data
end

ghostferry.run

assert state.length >= 1
assert_basic_fields_exist_in_dumped_state(state.last)
end
end
5 changes: 5 additions & 0 deletions test/lib/go/integrationferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ func NewStandardConfig() (*ghostferry.Config, error) {
}
config.ProgressReportFrequency = 500

config.StateCallback = ghostferry.HTTPCallback{
URI: fmt.Sprintf("http://localhost:%s/callbacks/state", integrationPort),
}
config.StateReportFrequency = 500

resumeStateJSON, err := ioutil.ReadAll(os.Stdin)
if err != nil {
return nil, err
Expand Down

0 comments on commit b75ccf3

Please sign in to comment.