Skip to content

Commit

Permalink
New CAS-based Store interface and QSCOD consensus wrapper
Browse files Browse the repository at this point in the history
that both builds on it as the underlying node state interface
and presents it as the interface to the logical consensus group.
  • Loading branch information
bford committed Apr 30, 2020
1 parent 88d8192 commit f5c4d55
Show file tree
Hide file tree
Showing 9 changed files with 506 additions and 73 deletions.
67 changes: 49 additions & 18 deletions go/lib/backoff/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@
package backoff

import (
"time"
"context"
"log"
"math/rand"
"time"
)

// Retry calls try() repeatedly until it returns without an error,
// with the default exponential backoff configuration.
func Retry(try func() error) {
Config{}.Retry(try)
//
// By default, Retry continues to try forever until it succeeds.
// The caller may pass a cancelable context in the ctx parameter, however,
// in case Retry will give up calling try when the context is cancelled.
// If the context was already cancelled on the call to Retry,
// then Retry returns ctx.Err() immediately without calling try.
//
func Retry(ctx context.Context, try func() error) error {
return Config{}.Retry(ctx, try)
}

// Config represents configuration parameters for exponential backoff.
Expand All @@ -22,35 +30,49 @@ func Retry(try func() error) {
// Report, if non-nil, is a function called by Retry to report errors
// in an appropriate fashion specific to the application.
// If nil, Retry reports errors via log.Println by default.
// Report may also call panic() to abort the Retry loop and unwind the stack
// if it determines that a detected error is permanent and waiting cannot help.
// Report may also return a non-nil error to abort the Retry loop if it
// determines that the detected error is permanent and waiting will not help.
//
type Config struct {
Report func(error) // Function to report errors
MaxWait time.Duration // Maximum backoff wait period
Report func(error) error // Function to report errors
MaxWait time.Duration // Maximum backoff wait period

mayGrow struct{} // Ensure Config remains extensible
mayGrow struct{} // Ensure Config remains extensible
}

func defaultReport(err error) error {
log.Println(err.Error())
return nil
}

// Retry calls try() repeatedly until it returns without an error,
// with exponential backoff configuration c.
func (c Config) Retry(try func() error) {
// using exponential backoff configuration c.
func (c Config) Retry(ctx context.Context, try func() error) error {

if c.Report == nil { // Default error reporter
c.Report = func(err error) { log.Println(err.Error()) }
// Make sure we have a valid error reporter
if c.Report == nil {
c.Report = defaultReport
}

backoff := time.Duration(1) // minimum backoff duration
// Return immediately if ctx was already cancelled
if ctx.Err() != nil {
return ctx.Err()
}

backoff := time.Duration(1) // minimum backoff duration
for {
before := time.Now()
err := try()
if err == nil { // success
return
if err == nil { // success
return nil
}
elapsed := time.Since(before)

// Report the error as appropriate
c.Report(err)
err = c.Report(err)
if err != nil {
return err // abort the retry loop
}

// Wait for an exponentially-growing random backoff period,
// with the duration of each operation attempt as the minimum
Expand All @@ -61,7 +83,16 @@ func (c Config) Retry(try func() error) {
if c.MaxWait > 0 && backoff > c.MaxWait {
backoff = c.MaxWait
}
time.Sleep(backoff)

// Wait for either the backoff timer or a cancel signal.
t := time.NewTimer(backoff)
select {
case <-t.C: // Backoff timer expired
continue

case <-ctx.Done(): // Our context got cancelled
t.Stop()
return ctx.Err()
}
}
}

27 changes: 25 additions & 2 deletions go/lib/backoff/retry_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package backoff

import (
"testing"
"context"
"errors"
"fmt"
"testing"
"time"
)

func TestRetry(t *testing.T) {
Expand All @@ -16,6 +18,27 @@ func TestRetry(t *testing.T) {
}
return nil
}
Retry(try)
Retry(context.Background(), try)
}

func TestTimeout(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
try := func() error {
return errors.New("haha, never going to succeed")
}
if err := Retry(ctx, try); err != context.DeadlineExceeded {
t.Errorf("got wrong error from Retry: %v", err.Error())
}

// Now test with an already-cancelled context
try = func() error {
panic("shouldn't get here!")
}
if err := Retry(ctx, try); err != context.DeadlineExceeded {
t.Errorf("got wrong error from Retry: %v", err.Error())
}

// for good measure
cancel()
}
81 changes: 81 additions & 0 deletions go/lib/cas/cas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Package cas defines a simple versioned check-and-set (CAS) state interface.
// It defines a generic access interface called Store,
// and a simple in-memory CAS register called Register.
//
// This CAS abstraction is functionally equivalent to classic compare-and-swap,
// but slightly more efficient and robust because it compares version numbers,
// which are always small and guaranteed to increase with each state change,
// rather than comparing actual state contents, which are arbitrary strings.
//
package cas

import (
"context"
"errors"
"sync"
)

// Store defines a CAS storage abstraction via a single CheckAndSet method.
//
// CheckAndSet writes a new version of the state containing value reqVal,
// provided the state has not changed since prior version lastVer.
// CheckAndSet then reads and returns the latest state version and value.
// The version check and conditional write are guaranteed to be atomic,
// ensuring that the caller can avoid undetected state loss due to races.
//
// When CheckAndSet succeeds in writing the caller's proposed value reqVal,
// it returns actualVer > lastVer, actualVal == reqVal, and err == nil.
// The Store assigns new version numbers, which must be increasing
// but need not be consecutive.
//
// If the stored state had already advanced past version number lastVer,
// CheckAndSet returns actualVer > lastVer, actualVal == the state value
// associated with actualVer, and err == Changed.
//
// If CheckAndSet returns any error other than Changed, then it may return
// actualVer == 0 and actualVal == "" to indicate the state couldn't be read.
//
// Version numbers are 64-bit integers, and values are arbitrary Go strings.
// Value strings may contain binary data; the Store treats them as opaque.
// While values in principle have no particular length limit, in practice
// Store implementations may expect them to be "reasonably small", i.e.,
// efficient for storing metadata but not necessarily for bulk data storage.
//
// CheckAndSet takes a Context parameter so that long-running implementations,
// particularly those accessing remote storage in a distributed system,
// can respond to cancellation requests and timeouts appropriately.
//
type Store interface {
CheckAndSet(ctx context.Context, lastVer int64, reqVal string) (
actualVer int64, actualVal string, err error)
}

// Register implements a simple local-memory CAS register.
// It is thread-safe and ready for use on instantiation.
type Register struct {
mut sync.Mutex // for synchronizing accesses
ver int64 // the latest version number
val string // the latest value written
}

// CheckAndSet implements the Store interface for the CAS register.
func (r *Register) CheckAndSet(
ctx context.Context, lastVer int64, reqVal string) (
actualVer int64, actualVal string, err error) {

r.mut.Lock()
defer r.mut.Unlock()

// If the version doesn't match, just return the latest version.
if r.ver != lastVer {
return r.ver, r.val, Changed
}

// Write and return the new version
r.ver, r.val = lastVer+1, reqVal
return r.ver, r.val, nil
}

// CheckAndSet returns Changed when the stored value was changed
// by someone else since the last version the caller indicated.
var Changed = errors.New("Version changed")
114 changes: 114 additions & 0 deletions go/lib/cas/test/cas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Package test implements shareable code for testing instantiations
// of the cas.Store check-and-set storage interface.
package test

import (
"context"
"fmt"
"sync"
"testing"

"github.com/dedis/tlc/go/lib/cas"
)

// History records a history of cas.Store version/value observations,
// typically made across concurrent goroutines or even distributed nodes,
// and checks all these observations for consistency.
//
type History struct {
vs []string // all history known to be committed so far
mut sync.Mutex // mutex protecting this reference order
}

// Observe records a version/value pair that was observed via a cas.Store,
// checks it for consistency against all prior recorded version/value pairs,
// and reports any errors via testing context t.
//
func (to *History) Observe(t *testing.T, ver int64, val string) {
to.mut.Lock()
defer to.mut.Unlock()

// Ensure history slice is long enough
for ver >= int64(len(to.vs)) {
to.vs = append(to.vs, "")
}

// Check commit consistency across all concurrent clients
switch {
case to.vs[ver] == "":
to.vs[ver] = val
case to.vs[ver] != val:
t.Errorf("History inconsistency at %v:\nold: %+v\nnew: %+v",
ver, to.vs[ver], val)
}
}

// Checked wraps the provided CAS store with a consistency-checker
// that records all requested and observed accesses against history h,
// reporting any inconsistency errors discovered via testing context t.
//
// The wrapper also consistency-checks the caller's accesses to the Store,
// ensuring that the provided lastVer is indeed the last version retrieved.
// This means that when checking a Store that is shared across goroutines,
// each goroutine should have its own Checked wrapper around that Store.
//
func Checked(t *testing.T, h *History, store cas.Store) cas.Store {
return &checkedStore{t, h, store, 0}
}

type checkedStore struct {
t *testing.T
h *History
s cas.Store
lv int64
}

func (cs *checkedStore) CheckAndSet(
ctx context.Context, lastVer int64, reqVal string) (
actualVer int64, actualVal string, err error) {

if lastVer != cs.lv {
cs.t.Errorf("Checked CAS store passed wrong last version")
}

actualVer, actualVal, err = cs.s.CheckAndSet(ctx, lastVer, reqVal)
cs.h.Observe(cs.t, actualVer, actualVal)

cs.lv = actualVer
return actualVer, actualVal, err
}

// Stores torture-tests one or more cas.Store interfaces
// that are all supposed to represent the same consistent underlying state.
// The test is driven by nthreads goroutines per Store interface,
// each of which performs naccesses CAS operations on its interface.
//
func Stores(t *testing.T, nthreads, naccesses int, store ...cas.Store) {

bg := context.Background()
wg := sync.WaitGroup{}
h := &History{}

tester := func(i, j int) {
cs := Checked(t, h, store[i])
var lastVer int64
for k := 0; k < naccesses; k++ {
reqVal := fmt.Sprintf("store %v thread %v access %v",
i, j, k)
lastVer, _, _ = cs.CheckAndSet(bg, lastVer, reqVal)
}
wg.Done()
}

// Launch a set of goroutines for each Store interface.
// To maximize cross-store concurrency,
// launch the first thread per store, then the second per store, etc.
for j := 0; j < nthreads; j++ {
for i := range store {
wg.Add(1)
go tester(i, j)
}
}

// Wait for all tester goroutines to complete
}
12 changes: 12 additions & 0 deletions go/lib/cas/test/cas_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package test

import (
"testing"

"github.com/dedis/tlc/go/lib/cas"
)

// Test the Client with a trivial in-memory key/value Store implementation.
func TestRegister(t *testing.T) {
TestStore(t, 100, 100000, &cas.Register{})
}
Loading

0 comments on commit f5c4d55

Please sign in to comment.