From 1b9d0266f395acfc6023322034deed8358aa26a7 Mon Sep 17 00:00:00 2001 From: John Edmonds Date: Fri, 12 Mar 2021 16:59:39 -0500 Subject: [PATCH] Close the graph columnar db iterator. (#4859) * Close the iterator. * Refactor loopBody into its own method. Co-authored-by: Shahms King --- kythe/go/serving/graph/columnar.go | 188 +++++++++++++++-------------- 1 file changed, 99 insertions(+), 89 deletions(-) diff --git a/kythe/go/serving/graph/columnar.go b/kythe/go/serving/graph/columnar.go index 37d3831858..c59e90d845 100644 --- a/kythe/go/serving/graph/columnar.go +++ b/kythe/go/serving/graph/columnar.go @@ -110,120 +110,130 @@ func (c *ColumnarTable) Nodes(ctx context.Context, req *gpb.NodesRequest) (*gpb. return reply, nil } -// Edges implements part of the graph.Service interface. -func (c *ColumnarTable) Edges(ctx context.Context, req *gpb.EdgesRequest) (*gpb.EdgesReply, error) { - // TODO(schroederc): implement edge paging - reply := &gpb.EdgesReply{ - EdgeSets: make(map[string]*gpb.EdgeSet, len(req.Ticket)), - Nodes: make(map[string]*cpb.NodeInfo), +// processTicket loads values associated with the search ticket and adds them to the reply. +func (c *ColumnarTable) processTicket(ctx context.Context, ticket string, patterns []*regexp.Regexp, allowedKinds stringset.Set, reply *gpb.EdgesReply) error { + srcURI, err := kytheuri.Parse(ticket) + if err != nil { + return err + } - // TODO(schroederc): TotalEdgesByKind: make(map[string]int64), + src := srcURI.VName() + prefix, err := keys.Append(columnar.EdgesKeyPrefix, src) + if err != nil { + return err } - patterns := xrefs.ConvertFilters(req.Filter) - allowedKinds := stringset.New(req.Kind...) - for _, ticket := range req.Ticket { - srcURI, err := kytheuri.Parse(ticket) - if err != nil { - return nil, err - } + it, err := c.DB.ScanPrefix(ctx, prefix, &keyvalue.Options{LargeRead: true}) + defer it.Close() + if err != nil { + return err + } - src := srcURI.VName() - prefix, err := keys.Append(columnar.EdgesKeyPrefix, src) - if err != nil { - return nil, err - } + k, val, err := it.Next() + if err == io.EOF || !bytes.Equal(k, prefix) { + return nil + } else if err != nil { + return err + } - it, err := c.DB.ScanPrefix(ctx, prefix, &keyvalue.Options{LargeRead: true}) - if err != nil { - return nil, err + // Decode Edges Index + var idx gspb.Edges_Index + if err := proto.Unmarshal(val, &idx); err != nil { + return fmt.Errorf("error decoding index: %v", err) + } + if len(patterns) > 0 { + if info := filterNode(patterns, idx.Node); len(info.Facts) > 0 { + reply.Nodes[ticket] = info } + } + + edges := &gpb.EdgeSet{Groups: make(map[string]*gpb.EdgeSet_Group)} + reply.EdgeSets[ticket] = edges + targets := stringset.New() + // Main loop to scan over each columnar kv entry. + for { k, val, err := it.Next() - if err == io.EOF || !bytes.Equal(k, prefix) { - continue + if err == io.EOF { + break } else if err != nil { - return nil, err + return err } + key := string(k[len(prefix):]) - // Decode Edges Index - var idx gspb.Edges_Index - if err := proto.Unmarshal(val, &idx); err != nil { - return nil, fmt.Errorf("error decoding index: %v", err) - } - if len(patterns) > 0 { - if info := filterNode(patterns, idx.Node); len(info.Facts) > 0 { - reply.Nodes[ticket] = info - } + // TODO(schroederc): only parse needed entries + e, err := columnar.DecodeEdgesEntry(src, key, val) + if err != nil { + return err } - edges := &gpb.EdgeSet{Groups: make(map[string]*gpb.EdgeSet_Group)} - reply.EdgeSets[ticket] = edges - targets := stringset.New() + switch e := e.Entry.(type) { + case *gspb.Edges_Edge_: + edge := e.Edge - // Main loop to scan over each columnar kv entry. - for { - k, val, err := it.Next() - if err == io.EOF { - break - } else if err != nil { - return nil, err + kind := edge.GetGenericKind() + if kind == "" { + kind = schema.EdgeKindString(edge.GetKytheKind()) + } + if edge.Reverse { + kind = "%" + kind } - key := string(k[len(prefix):]) - // TODO(schroederc): only parse needed entries - e, err := columnar.DecodeEdgesEntry(src, key, val) - if err != nil { - return nil, err + if len(allowedKinds) != 0 && !allowedKinds.Contains(kind) { + continue } - switch e := e.Entry.(type) { - case *gspb.Edges_Edge_: - edge := e.Edge + target := kytheuri.ToString(edge.Target) + targets.Add(target) - kind := edge.GetGenericKind() - if kind == "" { - kind = schema.EdgeKindString(edge.GetKytheKind()) - } - if edge.Reverse { - kind = "%" + kind - } + g := edges.Groups[kind] + if g == nil { + g = &gpb.EdgeSet_Group{} + edges.Groups[kind] = g + } + g.Edge = append(g.Edge, &gpb.EdgeSet_Group_Edge{ + TargetTicket: target, + Ordinal: edge.Ordinal, + }) + case *gspb.Edges_Target_: + if len(patterns) == 0 || len(targets) == 0 { + break + } - if len(allowedKinds) != 0 && !allowedKinds.Contains(kind) { - continue + target := e.Target + ticket := kytheuri.ToString(target.Node.Source) + if targets.Contains(ticket) { + if info := filterNode(patterns, target.Node); len(info.Facts) > 0 { + reply.Nodes[ticket] = info } + } + default: + return fmt.Errorf("unknown Edges entry: %T", e) + } + } - target := kytheuri.ToString(edge.Target) - targets.Add(target) + if len(edges.Groups) == 0 { + delete(reply.EdgeSets, ticket) + } + return nil +} - g := edges.Groups[kind] - if g == nil { - g = &gpb.EdgeSet_Group{} - edges.Groups[kind] = g - } - g.Edge = append(g.Edge, &gpb.EdgeSet_Group_Edge{ - TargetTicket: target, - Ordinal: edge.Ordinal, - }) - case *gspb.Edges_Target_: - if len(patterns) == 0 || len(targets) == 0 { - break - } +// Edges implements part of the graph.Service interface. +func (c *ColumnarTable) Edges(ctx context.Context, req *gpb.EdgesRequest) (*gpb.EdgesReply, error) { + // TODO(schroederc): implement edge paging + reply := &gpb.EdgesReply{ + EdgeSets: make(map[string]*gpb.EdgeSet, len(req.Ticket)), + Nodes: make(map[string]*cpb.NodeInfo), - target := e.Target - ticket := kytheuri.ToString(target.Node.Source) - if targets.Contains(ticket) { - if info := filterNode(patterns, target.Node); len(info.Facts) > 0 { - reply.Nodes[ticket] = info - } - } - default: - return nil, fmt.Errorf("unknown Edges entry: %T", e) - } - } + // TODO(schroederc): TotalEdgesByKind: make(map[string]int64), + } + patterns := xrefs.ConvertFilters(req.Filter) + allowedKinds := stringset.New(req.Kind...) - if len(edges.Groups) == 0 { - delete(reply.EdgeSets, ticket) + for _, ticket := range req.Ticket { + err := c.processTicket(ctx, ticket, patterns, allowedKinds, reply) + if err != nil { + return nil, err } }