Skip to content

Commit

Permalink
Close the graph columnar db iterator. (kythe#4859)
Browse files Browse the repository at this point in the history
* Close the iterator.

* Refactor loopBody into its own method.

Co-authored-by: Shahms King <[email protected]>
  • Loading branch information
johnedmonds and shahms authored Mar 12, 2021
1 parent adb5eb5 commit 1b9d026
Showing 1 changed file with 99 additions and 89 deletions.
188 changes: 99 additions & 89 deletions kythe/go/serving/graph/columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,120 +110,130 @@ func (c *ColumnarTable) Nodes(ctx context.Context, req *gpb.NodesRequest) (*gpb.
return reply, nil
}

// Edges implements part of the graph.Service interface.
func (c *ColumnarTable) Edges(ctx context.Context, req *gpb.EdgesRequest) (*gpb.EdgesReply, error) {
// TODO(schroederc): implement edge paging
reply := &gpb.EdgesReply{
EdgeSets: make(map[string]*gpb.EdgeSet, len(req.Ticket)),
Nodes: make(map[string]*cpb.NodeInfo),
// processTicket loads values associated with the search ticket and adds them to the reply.
func (c *ColumnarTable) processTicket(ctx context.Context, ticket string, patterns []*regexp.Regexp, allowedKinds stringset.Set, reply *gpb.EdgesReply) error {
srcURI, err := kytheuri.Parse(ticket)
if err != nil {
return err
}

// TODO(schroederc): TotalEdgesByKind: make(map[string]int64),
src := srcURI.VName()
prefix, err := keys.Append(columnar.EdgesKeyPrefix, src)
if err != nil {
return err
}
patterns := xrefs.ConvertFilters(req.Filter)
allowedKinds := stringset.New(req.Kind...)

for _, ticket := range req.Ticket {
srcURI, err := kytheuri.Parse(ticket)
if err != nil {
return nil, err
}
it, err := c.DB.ScanPrefix(ctx, prefix, &keyvalue.Options{LargeRead: true})
defer it.Close()
if err != nil {
return err
}

src := srcURI.VName()
prefix, err := keys.Append(columnar.EdgesKeyPrefix, src)
if err != nil {
return nil, err
}
k, val, err := it.Next()
if err == io.EOF || !bytes.Equal(k, prefix) {
return nil
} else if err != nil {
return err
}

it, err := c.DB.ScanPrefix(ctx, prefix, &keyvalue.Options{LargeRead: true})
if err != nil {
return nil, err
// Decode Edges Index
var idx gspb.Edges_Index
if err := proto.Unmarshal(val, &idx); err != nil {
return fmt.Errorf("error decoding index: %v", err)
}
if len(patterns) > 0 {
if info := filterNode(patterns, idx.Node); len(info.Facts) > 0 {
reply.Nodes[ticket] = info
}
}

edges := &gpb.EdgeSet{Groups: make(map[string]*gpb.EdgeSet_Group)}
reply.EdgeSets[ticket] = edges
targets := stringset.New()

// Main loop to scan over each columnar kv entry.
for {
k, val, err := it.Next()
if err == io.EOF || !bytes.Equal(k, prefix) {
continue
if err == io.EOF {
break
} else if err != nil {
return nil, err
return err
}
key := string(k[len(prefix):])

// Decode Edges Index
var idx gspb.Edges_Index
if err := proto.Unmarshal(val, &idx); err != nil {
return nil, fmt.Errorf("error decoding index: %v", err)
}
if len(patterns) > 0 {
if info := filterNode(patterns, idx.Node); len(info.Facts) > 0 {
reply.Nodes[ticket] = info
}
// TODO(schroederc): only parse needed entries
e, err := columnar.DecodeEdgesEntry(src, key, val)
if err != nil {
return err
}

edges := &gpb.EdgeSet{Groups: make(map[string]*gpb.EdgeSet_Group)}
reply.EdgeSets[ticket] = edges
targets := stringset.New()
switch e := e.Entry.(type) {
case *gspb.Edges_Edge_:
edge := e.Edge

// Main loop to scan over each columnar kv entry.
for {
k, val, err := it.Next()
if err == io.EOF {
break
} else if err != nil {
return nil, err
kind := edge.GetGenericKind()
if kind == "" {
kind = schema.EdgeKindString(edge.GetKytheKind())
}
if edge.Reverse {
kind = "%" + kind
}
key := string(k[len(prefix):])

// TODO(schroederc): only parse needed entries
e, err := columnar.DecodeEdgesEntry(src, key, val)
if err != nil {
return nil, err
if len(allowedKinds) != 0 && !allowedKinds.Contains(kind) {
continue
}

switch e := e.Entry.(type) {
case *gspb.Edges_Edge_:
edge := e.Edge
target := kytheuri.ToString(edge.Target)
targets.Add(target)

kind := edge.GetGenericKind()
if kind == "" {
kind = schema.EdgeKindString(edge.GetKytheKind())
}
if edge.Reverse {
kind = "%" + kind
}
g := edges.Groups[kind]
if g == nil {
g = &gpb.EdgeSet_Group{}
edges.Groups[kind] = g
}
g.Edge = append(g.Edge, &gpb.EdgeSet_Group_Edge{
TargetTicket: target,
Ordinal: edge.Ordinal,
})
case *gspb.Edges_Target_:
if len(patterns) == 0 || len(targets) == 0 {
break
}

if len(allowedKinds) != 0 && !allowedKinds.Contains(kind) {
continue
target := e.Target
ticket := kytheuri.ToString(target.Node.Source)
if targets.Contains(ticket) {
if info := filterNode(patterns, target.Node); len(info.Facts) > 0 {
reply.Nodes[ticket] = info
}
}
default:
return fmt.Errorf("unknown Edges entry: %T", e)
}
}

target := kytheuri.ToString(edge.Target)
targets.Add(target)
if len(edges.Groups) == 0 {
delete(reply.EdgeSets, ticket)
}
return nil
}

g := edges.Groups[kind]
if g == nil {
g = &gpb.EdgeSet_Group{}
edges.Groups[kind] = g
}
g.Edge = append(g.Edge, &gpb.EdgeSet_Group_Edge{
TargetTicket: target,
Ordinal: edge.Ordinal,
})
case *gspb.Edges_Target_:
if len(patterns) == 0 || len(targets) == 0 {
break
}
// Edges implements part of the graph.Service interface.
func (c *ColumnarTable) Edges(ctx context.Context, req *gpb.EdgesRequest) (*gpb.EdgesReply, error) {
// TODO(schroederc): implement edge paging
reply := &gpb.EdgesReply{
EdgeSets: make(map[string]*gpb.EdgeSet, len(req.Ticket)),
Nodes: make(map[string]*cpb.NodeInfo),

target := e.Target
ticket := kytheuri.ToString(target.Node.Source)
if targets.Contains(ticket) {
if info := filterNode(patterns, target.Node); len(info.Facts) > 0 {
reply.Nodes[ticket] = info
}
}
default:
return nil, fmt.Errorf("unknown Edges entry: %T", e)
}
}
// TODO(schroederc): TotalEdgesByKind: make(map[string]int64),
}
patterns := xrefs.ConvertFilters(req.Filter)
allowedKinds := stringset.New(req.Kind...)

if len(edges.Groups) == 0 {
delete(reply.EdgeSets, ticket)
for _, ticket := range req.Ticket {
err := c.processTicket(ctx, ticket, patterns, allowedKinds, reply)
if err != nil {
return nil, err
}
}

Expand Down

0 comments on commit 1b9d026

Please sign in to comment.