Skip to content

Commit

Permalink
util/file_reader: remove errChan
Browse files Browse the repository at this point in the history
  • Loading branch information
adamstruck committed Jun 19, 2018
1 parent d674719 commit 4db5307
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 55 deletions.
4 changes: 2 additions & 2 deletions cmd/info/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var Cmd = &cobra.Command{
return err
}
for row := range res {
fmt.Printf("Vertex Count: %s\n", row)
fmt.Printf("Vertex Count: %v\n", row.GetCount())
}

q = aql.E().Count()
Expand All @@ -39,7 +39,7 @@ var Cmd = &cobra.Command{
return err
}
for row := range res {
fmt.Printf("Edge Count: %s\n", row)
fmt.Printf("Edge Count: %v\n", row.GetCount())
}
return nil
},
Expand Down
49 changes: 17 additions & 32 deletions cmd/load/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,44 +91,29 @@ var Cmd = &cobra.Command{

if vertexFile != "" {
log.Printf("Loading %s", vertexFile)
verts, errs := util.StreamVerticesFromFile(vertexFile)
go func(verts chan *aql.Vertex) {
count := 0
for v := range verts {
count++
if count%1000 == 0 {
log.Printf("Loaded %d vertices", count)
}
elemChan <- &aql.GraphElement{Graph: graph, Vertex: v}
count := 0
for v := range util.StreamVerticesFromFile(vertexFile) {
count++
if count%1000 == 0 {
log.Printf("Loaded %d vertices", count)
}
log.Printf("Loaded %d vertices", count)
}(verts)
go func(errs chan error) {
for e := range errs {
log.Printf("Error loading vertices: %v", e)
}
}(errs)
elemChan <- &aql.GraphElement{Graph: graph, Vertex: v}
}
log.Printf("Loaded %d vertices", count)

}

if edgeFile != "" {
log.Printf("Loading %s", edgeFile)
edges, errs := util.StreamEdgesFromFile(edgeFile)
go func(edges chan *aql.Edge) {
count := 0
for e := range edges {
count++
if count%1000 == 0 {
log.Printf("Loaded %d edges", count)
}
elemChan <- &aql.GraphElement{Graph: graph, Edge: e}
count := 0
for e := range util.StreamEdgesFromFile(edgeFile) {
count++
if count%1000 == 0 {
log.Printf("Loaded %d edges", count)
}
log.Printf("Loaded %d edges", count)
}(edges)
go func(errs chan error) {
for e := range errs {
log.Printf("Error loading vertices: %v", e)
}
}(errs)
elemChan <- &aql.GraphElement{Graph: graph, Edge: e}
}
log.Printf("Loaded %d edges", count)
}

m := jsonpb.Unmarshaler{AllowUnknownFields: true}
Expand Down
4 changes: 2 additions & 2 deletions test/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ var edges = []*aql.Edge{}
func init() {
flag.StringVar(&configFile, "config", configFile, "config file to use for tests")
flag.Parse()
vertChan, _ := util.StreamVerticesFromFile("./resources/smtest_vertices.txt")
vertChan := util.StreamVerticesFromFile("./resources/smtest_vertices.txt")
for v := range vertChan {
vertices = append(vertices, v)
}
edgeChan, _ := util.StreamEdgesFromFile("./resources/smtest_edges.txt")
edgeChan := util.StreamEdgesFromFile("./resources/smtest_edges.txt")
for e := range edgeChan {
edges = append(edges, e)
}
Expand Down
36 changes: 17 additions & 19 deletions util/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,26 @@ package util

import (
"bytes"
"fmt"
"io"
"log"

"github.com/bmeg/arachne/aql"
"github.com/bmeg/golib"
"github.com/golang/protobuf/jsonpb"
)

// StreamVerticesFromFile reads a file containing a vertex per line and
// streams *aql.Vertex objects and errors out on channels
func StreamVerticesFromFile(file string) (chan *aql.Vertex, chan error) {
vertChan := make(chan *aql.Vertex)
errChan := make(chan error)
// streams *aql.Vertex objects out on a channel
func StreamVerticesFromFile(file string) chan *aql.Vertex {
vertChan := make(chan *aql.Vertex, 100)

go func() {
defer close(vertChan)
defer close(errChan)

reader, err := golib.ReadFileLines(file)
if err != nil {
errChan <- fmt.Errorf("reading file: %v", err)
log.Printf("Error: reading file: %v", err)
return
}

m := jsonpb.Unmarshaler{AllowUnknownFields: true}
Expand All @@ -33,29 +32,28 @@ func StreamVerticesFromFile(file string) (chan *aql.Vertex, chan error) {
break
}
if err != nil {
errChan <- fmt.Errorf("unmarshaling vertex: %v", err)
continue
log.Printf("Error: unmarshaling vertex: %v", err)
return
}
vertChan <- v
}
}()

return vertChan, errChan
return vertChan
}

// StreamEdgesFromFile reads a file containing an edge per line and
// streams aql.Edge objects and errors out on channels
func StreamEdgesFromFile(file string) (chan *aql.Edge, chan error) {
edgeChan := make(chan *aql.Edge)
errChan := make(chan error)
// streams aql.Edge objects on a channel
func StreamEdgesFromFile(file string) chan *aql.Edge {
edgeChan := make(chan *aql.Edge, 100)

go func() {
defer close(edgeChan)
defer close(errChan)

reader, err := golib.ReadFileLines(file)
if err != nil {
errChan <- fmt.Errorf("reading file: %v", err)
log.Printf("Error: reading file: %v", err)
return
}

m := jsonpb.Unmarshaler{AllowUnknownFields: true}
Expand All @@ -66,12 +64,12 @@ func StreamEdgesFromFile(file string) (chan *aql.Edge, chan error) {
break
}
if err != nil {
errChan <- fmt.Errorf("unmarshaling edge: %v", err)
continue
log.Printf("Error: unmarshaling edge: %v", err)
return
}
edgeChan <- e
}
}()

return edgeChan, errChan
return edgeChan
}

0 comments on commit 4db5307

Please sign in to comment.