Skip to content

Commit

Permalink
clean locking
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Nov 4, 2015
1 parent d94e9f1 commit 897730f
Showing 1 changed file with 18 additions and 21 deletions.
39 changes: 18 additions & 21 deletions agent/named_datasore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ import (
)

type ManagedDatasetShards struct {
sync.Mutex
dir string
port int
name2Store map[string]store.DataStore
name2StoreCond *sync.Cond
}

func NewManagedDatasetShards(dir string, port int) *ManagedDatasetShards {
var lock sync.Mutex
return &ManagedDatasetShards{
dir: dir,
port: port,
name2Store: make(map[string]store.DataStore),
name2StoreCond: sync.NewCond(&lock),
m := &ManagedDatasetShards{
dir: dir,
port: port,
name2Store: make(map[string]store.DataStore),
}
m.name2StoreCond = sync.NewCond(m)
return m
}

func (m *ManagedDatasetShards) doDelete(name string) {
Expand All @@ -39,16 +40,18 @@ func (m *ManagedDatasetShards) doDelete(name string) {

func (m *ManagedDatasetShards) DeleteNamedDatasetShard(name string) {

m.name2StoreCond.L.Lock()
defer m.name2StoreCond.L.Unlock()
m.Lock()
defer m.Unlock()

m.doDelete(name)

}

func (m *ManagedDatasetShards) CreateNamedDatasetShard(name string) store.DataStore {

m.name2StoreCond.L.Lock()
m.Lock()
defer m.Unlock()

_, ok := m.name2Store[name]
if ok {
m.doDelete(name)
Expand All @@ -57,36 +60,30 @@ func (m *ManagedDatasetShards) CreateNamedDatasetShard(name string) store.DataSt
s, err := store.NewLocalFileDataStore(m.dir, fmt.Sprintf("%s-%d", name, m.port))
if err != nil {
log.Printf("Failed to create a queue on disk: %v", err)
m.name2StoreCond.L.Unlock()
return nil
}

m.name2Store[name] = s
// println(name, "is broadcasting...")
m.name2StoreCond.Broadcast()

m.name2StoreCond.L.Unlock()

return s

}

func (m *ManagedDatasetShards) WaitForNamedDatasetShard(name string) store.DataStore {

var ds store.DataStore
var ok bool
m.Lock()
defer m.Unlock()

m.name2StoreCond.L.Lock()
for {
ds, ok = m.name2Store[name]
if ok {
break
if ds, ok := m.name2Store[name]; ok {
return ds
}
println(name, "is waiting to read...")
// println(name, "is waiting to read...")
m.name2StoreCond.Wait()
}
m.name2StoreCond.L.Unlock()

return ds
return nil

}

0 comments on commit 897730f

Please sign in to comment.