Skip to content

Commit

Permalink
remove windows files properly; fix map locking
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Feb 28, 2016
1 parent 3876d2f commit 7ab33bd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
13 changes: 10 additions & 3 deletions driver/scheduler/shard_location.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (

type DatasetShardLocator struct {
sync.Mutex
executableFileHash string
datasetShard2Location map[string]resource.Location
waitForAllInputs *sync.Cond
executableFileHash string
datasetShard2Location map[string]resource.Location
datasetShard2LocationLock sync.Mutex
waitForAllInputs *sync.Cond
}

func NewDatasetShardLocator(executableFileHash string) *DatasetShardLocator {
Expand All @@ -25,6 +26,9 @@ func NewDatasetShardLocator(executableFileHash string) *DatasetShardLocator {
}

func (l *DatasetShardLocator) GetShardLocation(shardName string) (resource.Location, bool) {
l.datasetShard2LocationLock.Lock()
defer l.datasetShard2LocationLock.Unlock()

loc, hasValue := l.datasetShard2Location[shardName]
return loc, hasValue
}
Expand All @@ -33,12 +37,15 @@ func (l *DatasetShardLocator) SetShardLocation(name string, location resource.Lo
l.Lock()
defer l.Unlock()

l.datasetShard2LocationLock.Lock()
defer l.datasetShard2LocationLock.Unlock()
// fmt.Printf("shard %s is at %s\n", name, location.URL())
l.datasetShard2Location[name] = location
l.waitForAllInputs.Broadcast()
}

func (l *DatasetShardLocator) allInputsAreRegistered(task *flow.Task) bool {

for _, input := range task.Inputs {
if _, hasValue := l.GetShardLocation(l.executableFileHash + "-" + input.Name()); !hasValue {
// fmt.Printf("%s's input %s is not ready\n", task.Name(), input.Name())
Expand Down
9 changes: 9 additions & 0 deletions netchan/store/rotating_file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,14 @@ func (l *RotatingFileStore) ReadAt(data []byte, offset int64) (int, error) {
return l.readFullFromOldSegmentsAt(data, offset)
}

// fmt.Printf("Read: l.Offset=%d, offset=%d\n", l.Offset, offset)

l.mu.Lock()
defer l.mu.Unlock()

// create the file does not exist
if l.file == nil {
// fmt.Printf("Read: creating new file...\n")
if err := l.openNew(); err != nil {
return 0, err
}
Expand All @@ -120,9 +123,11 @@ func (l *RotatingFileStore) ReadAt(data []byte, offset int64) (int, error) {

// wait for data not written yet
for offset == l.Position {
// fmt.Printf("Read: wait for reading...\n")
l.waitForReading.Wait()
}

// fmt.Printf("Read: file reading...\n")
return l.file.ReadAt(data, offset-l.Offset)
}

Expand Down Expand Up @@ -347,8 +352,12 @@ func deleteAll(dir string, files []logInfo) {

func (l *RotatingFileStore) Destroy() {
for _, seg := range l.Segments {
// println("removing segment:", seg.File.Name())
seg.File.Close()
os.Remove(seg.File.Name())
}
// println("removing file", l.filename())
l.Close()
os.Remove(l.filename())
}

Expand Down

0 comments on commit 7ab33bd

Please sign in to comment.