Skip to content

Commit

Permalink
Changed CAS interface to a classic compare-and-set abstraction,
Browse files Browse the repository at this point in the history
retaining store-assigned informational version numbers
only for consistency checking.
CAS-based consensus now seems to be working finally!
  • Loading branch information
bford committed May 4, 2020
1 parent 7063788 commit d3af99b
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 264 deletions.
30 changes: 15 additions & 15 deletions go/lib/cas/cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package cas

import (
"context"
"errors"
//"errors"
"sync"
)

Expand All @@ -31,6 +31,8 @@ import (
// If the stored state had already advanced past version number lastVer,
// CheckAndSet returns actualVer > lastVer, actualVal == the state value
// associated with actualVer, and err == Changed.
// The version number of the stored state may appear to increase at any time
// even when the associated value has not changed.
//
// If CheckAndSet returns any error other than Changed, then it may return
// actualVer == 0 and actualVal == "" to indicate the state couldn't be read.
Expand All @@ -46,36 +48,34 @@ import (
// can respond to cancellation requests and timeouts appropriately.
//
type Store interface {
CheckAndSet(ctx context.Context, lastVer int64, reqVal string) (
actualVer int64, actualVal string, err error)
CompareAndSet(ctx context.Context, old, new string) (
version int64, actual string, err error)
}

// Register implements a simple local-memory CAS register.
// It is thread-safe and ready for use on instantiation.
type Register struct {
mut sync.Mutex // for synchronizing accesses
ver int64 // the latest version number
val string // the latest value written
ver int64 // version number of the latest value
}

// CheckAndSet implements the Store interface for the CAS register.
func (r *Register) CheckAndSet(
ctx context.Context, lastVer int64, reqVal string) (
actualVer int64, actualVal string, err error) {
// CompareAndSet implements the Store interface for the CAS register.
func (r *Register) CompareAndSet(ctx context.Context, old, new string) (
version int64, actual string, err error) {

r.mut.Lock()
defer r.mut.Unlock()

// If the version doesn't match, just return the latest version.
if r.ver != lastVer {
return r.ver, r.val, Changed
// Update the value only if the current value is as expected.
if r.val == old {
r.ver, r.val = r.ver+1, new
}

// Write and return the new version
r.ver, r.val = lastVer+1, reqVal
// Return the actual new value, changed or not.
return r.ver, r.val, nil
}

// CheckAndSet returns Changed when the stored value was changed
// CompareAndSet returns Changed when the stored value was changed
// by someone else since the last version the caller indicated.
var Changed = errors.New("Version changed")
//var Changed = errors.New("Version changed")
75 changes: 43 additions & 32 deletions go/lib/cas/test/cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,31 @@ import (
// and checks all these observations for consistency.
//
type History struct {
vs []string // all history known to be committed so far
mut sync.Mutex // mutex protecting this reference order
hist map[int64]string // version-value map defining observed history
mut sync.Mutex // mutex protecting this reference order
}

// Observe records a version/value pair that was observed via a cas.Store,
// checks it for consistency against all prior recorded version/value pairs,
// Observe records an old/new value pair that was observed via a cas.Store,
// checks it for consistency against all prior recorded old/new value pairs,
// and reports any errors via testing context t.
//
func (to *History) Observe(t *testing.T, ver int64, val string) {
func (to *History) Observe(t *testing.T, version int64, value string) {
to.mut.Lock()
defer to.mut.Unlock()

// Ensure history slice is long enough
for ver >= int64(len(to.vs)) {
to.vs = append(to.vs, "")
// Create the successor map if it doesn't already exist
if to.hist == nil {
to.hist = make(map[int64]string)
}

// Check commit consistency across all concurrent clients
switch {
case to.vs[ver] == "":
to.vs[ver] = val
case to.vs[ver] != val:
t.Errorf("\nHistory inconsistency at %v:\nold: %+v\nnew: %+v",
ver, to.vs[ver], val)
// If there is any recorded successor to old, it had better be new.
if old, exist := to.hist[version]; exist && old != value {
t.Errorf("\nInconsistency:\n ver %v\n old %q\n new %q\n",
version, old, value)
}

// Record the new successor
to.hist[version] = value
}

// Checked wraps the provided CAS store with a consistency-checker
Expand All @@ -53,29 +53,37 @@ func (to *History) Observe(t *testing.T, ver int64, val string) {
// each goroutine should have its own Checked wrapper around that Store.
//
func Checked(t *testing.T, h *History, store cas.Store) cas.Store {
return &checkedStore{t, h, store, 0}
return &checkedStore{t, h, store}
}

type checkedStore struct {
t *testing.T
h *History
s cas.Store
lv int64
t *testing.T
h *History
s cas.Store
}

func (cs *checkedStore) CheckAndSet(
ctx context.Context, lastVer int64, reqVal string) (
actualVer int64, actualVal string, err error) {
func (cs *checkedStore) CompareAndSet(ctx context.Context, old, new string) (
version int64, actual string, err error) {

//if lastVer != cs.lv {
// cs.t.Errorf("Checked CAS store passed wrong last version")
//}

if lastVer != cs.lv {
cs.t.Errorf("Checked CAS store passed wrong last version")
if new == "" {
cs.t.Errorf("CompareAndSet: new value empty")
}
if new == old {
cs.t.Errorf("CompareAndSet: new value identical to old")
}

// Try to change old to new atomically.
version, actual, err = cs.s.CompareAndSet(ctx, old, new)

actualVer, actualVal, err = cs.s.CheckAndSet(ctx, lastVer, reqVal)
cs.h.Observe(cs.t, actualVer, actualVal)
// Record all actual version/value pairs we observe.
cs.h.Observe(cs.t, version, actual)

cs.lv = actualVer
return actualVer, actualVal, err
// Return the actual new value regardless.
return version, actual, err
}

// Stores torture-tests one or more cas.Store interfaces
Expand All @@ -91,12 +99,15 @@ func Stores(t *testing.T, nthreads, naccesses int, store ...cas.Store) {

tester := func(i, j int) {
cs := Checked(t, h, store[i])
var lastVer int64
old, err := "", error(nil)
for k := 0; k < naccesses; k++ {
reqVal := fmt.Sprintf("store %v thread %v access %v",
new := fmt.Sprintf("store %v thread %v access %v",
i, j, k)
//println("tester", i, j, "access", k)
lastVer, _, _ = cs.CheckAndSet(bg, lastVer, reqVal)
_, old, err = cs.CompareAndSet(bg, old, new)
if err != nil {
t.Error("CompareAndSet: " + err.Error())
}
}
//println("tester", i, j, "done")
wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion go/lib/cas/test/cas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (

// Test the Client with a trivial in-memory key/value Store implementation.
func TestRegister(t *testing.T) {
TestStore(t, 100, 100000, &cas.Register{})
Stores(t, 100, 100000, &cas.Register{})
}
Loading

0 comments on commit d3af99b

Please sign in to comment.