Skip to content

Commit

Permalink
some more work on debugging issues in event streams (bluesky-social#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping authored Apr 21, 2023
2 parents 22c94f0 + 137ea43 commit 8c8248d
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 34 deletions.
5 changes: 3 additions & 2 deletions api/atproto/syncsubscribeRepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ func init() {
}

type SyncSubscribeRepos_Commit struct {
Blobs []util.LexLink `json:"blobs" cborgen:"blobs"`
Blocks util.LexBytes `json:"blocks" cborgen:"blocks"`
Blobs []util.LexLink `json:"blobs" cborgen:"blobs"`
// TODO: need to fix lexgen to make LexBytes 'omitempty'. This is currently being manually edited
Blocks util.LexBytes `json:"blocks,omitempty" cborgen:"blocks"`
Commit util.LexLink `json:"commit" cborgen:"commit"`
Ops []*SyncSubscribeRepos_RepoOp `json:"ops" cborgen:"ops"`
Prev *util.LexLink `json:"prev" cborgen:"prev"`
Expand Down
26 changes: 18 additions & 8 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,28 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event

if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, (*cid.Cid)(evt.Prev), evt.Blocks); err != nil {
log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
if !errors.Is(err, carstore.ErrRepoBaseMismatch) {
return fmt.Errorf("handle user event failed: %w", err)
}

ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
return err
if errors.Is(err, carstore.ErrRepoBaseMismatch) {
ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
return err
}

span.SetAttributes(attribute.Bool("catchup_queue", true))

return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
}

span.SetAttributes(attribute.Bool("catchup_queue", true))
if errors.Is(err, carstore.ErrRepoFork) {
log.Errorw("detected repo fork", "from", stringLink(evt.Prev), "host", host.Host, "repo", u.Did)

span.SetAttributes(attribute.Bool("catchup_queue", true))
span.SetAttributes(attribute.Bool("fork", true))

return fmt.Errorf("cannot process repo fork")
}

return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
return fmt.Errorf("handle user event failed: %w", err)
}

// sync blobs
Expand Down
34 changes: 34 additions & 0 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ func (cs *CarStore) getLastShard(ctx context.Context, user util.Uid) (*CarShard,

var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head")

var ErrRepoFork = fmt.Errorf("repo fork detected")

func (cs *CarStore) NewDeltaSession(ctx context.Context, user util.Uid, prev *cid.Cid) (*DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
defer span.End()
Expand All @@ -291,6 +293,15 @@ func (cs *CarStore) NewDeltaSession(ctx context.Context, user util.Uid, prev *ci

if prev != nil {
if lastShard.Root.CID != *prev {
fork, err := cs.checkFork(ctx, user, *prev)
if err != nil {
return nil, fmt.Errorf("failed to check carstore base mismatch for fork condition: %w", err)
}

if fork {
return nil, fmt.Errorf("fork at %s: %w", prev.String(), ErrRepoFork)
}

return nil, fmt.Errorf("mismatch: %s != %s: %w", lastShard.Root.CID, prev.String(), ErrRepoBaseMismatch)
}
}
Expand Down Expand Up @@ -678,3 +689,26 @@ func (cs *CarStore) Stat(ctx context.Context, usr util.Uid) ([]UserStat, error)

return out, nil
}

func (cs *CarStore) checkFork(ctx context.Context, user util.Uid, prev cid.Cid) (bool, error) {
lastShard, err := cs.getLastShard(ctx, user)
if err != nil {
return false, err
}

var maybeShard CarShard
if err := cs.meta.WithContext(ctx).Model(CarShard{}).Find(&maybeShard, "usr = ? AND root = ?", user, &util.DbCID{prev}).Error; err != nil {
return false, err
}

if maybeShard.ID == lastShard.ID {
// somehow we are checking if a valid 'append' is a fork, seems buggy, throw an error
return false, fmt.Errorf("invariant broken: checked for forkiness of a valid append")
}

if maybeShard.ID == 0 {
return false, nil
}

return true, nil
}
55 changes: 33 additions & 22 deletions cmd/gosky/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/repomgr"

Expand Down Expand Up @@ -143,6 +144,14 @@ type eventInfo struct {
LastSeq int64
}

func cidStr(c *util.LexLink) string {
if c == nil {
return "<nil>"
}

return c.String()
}

var debugStreamCmd = &cli.Command{
Name: "debug-stream",
Flags: []cli.Flag{
Expand Down Expand Up @@ -177,35 +186,37 @@ var debugStreamCmd = &cli.Command{

fmt.Printf("\rChecking seq: %d ", evt.Seq)

r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
if err != nil {
fmt.Printf("\nEvent at sequence %d had an invalid repo slice: %s\n", evt.Seq, err)
return nil
} else {
prev, err := r.PrevCommit(ctx)
if !evt.TooBig {
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
if err != nil {
return err
}

var cs, es string
if prev != nil {
cs = prev.String()
}

if evt.Prev != nil {
es = evt.Prev.String()
}

if cs != es {
fmt.Printf("\nEvent at sequence %d has mismatch between slice prev and struct prev: %s != %s\n", evt.Seq, prev, evt.Prev)
fmt.Printf("\nEvent at sequence %d had an invalid repo slice: %s\n", evt.Seq, err)
return nil
} else {
prev, err := r.PrevCommit(ctx)
if err != nil {
return err
}

var cs, es string
if prev != nil {
cs = prev.String()
}

if evt.Prev != nil {
es = evt.Prev.String()
}

if cs != es {
fmt.Printf("\nEvent at sequence %d has mismatch between slice prev and struct prev: %s != %s\n", evt.Seq, prev, evt.Prev)
}
}
}

cur, ok := infos[evt.Repo]
if ok {
if cur.LastCid.String() != evt.Prev.String() {
if cur.LastCid.String() != cidStr(evt.Prev) {
fmt.Println()
fmt.Printf("Event at sequence %d, repo=%s had prev=%s head=%s, but last commit we saw was %s (seq=%d)\n", evt.Seq, evt.Repo, evt.Prev.String(), evt.Commit.String(), evt.Commit.String(), cur.LastSeq)
fmt.Printf("Event at sequence %d, repo=%s had prev=%s head=%s, but last commit we saw was %s (seq=%d)\n", evt.Seq, evt.Repo, cidStr(evt.Prev), evt.Commit.String(), cur.LastCid, cur.LastSeq)
}
}

Expand Down
159 changes: 159 additions & 0 deletions cmd/gosky/streamdiff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package main

import (
"context"
"fmt"
"net/http"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/events"
"github.com/gorilla/websocket"
cli "github.com/urfave/cli/v2"
)

// TODO: WIP - turns out to be more complicated than i initially thought
var streamCompareCmd = &cli.Command{
Name: "diff-stream",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
d := websocket.DefaultDialer

hosta := cctx.Args().Get(0)
hostb := cctx.Args().Get(1)

cona, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", hosta), http.Header{})
if err != nil {
return fmt.Errorf("dial failure: %w", err)
}

conb, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", hostb), http.Header{})
if err != nil {
return fmt.Errorf("dial failure: %w", err)
}

sd := &streamDiffer{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
err = events.HandleRepoStream(ctx, cona, &events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
sd.PushA(&events.XRPCStreamEvent{
RepoCommit: evt,
})
return nil
},
RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error {
return nil
},
// TODO: all the other Repo* event types
Error: func(evt *events.ErrorFrame) error {
return fmt.Errorf("%s: %s", evt.Error, evt.Message)
},
})
if err != nil {
log.Errorf("stream A failed: %s", err)
}
}()

go func() {
err = events.HandleRepoStream(ctx, conb, &events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
sd.PushB(&events.XRPCStreamEvent{
RepoCommit: evt,
})
return nil
},
RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error {
return nil
},
// TODO: all the other Repo* event types
Error: func(evt *events.ErrorFrame) error {
return fmt.Errorf("%s: %s", evt.Error, evt.Message)
},
})
if err != nil {
log.Errorf("stream A failed: %s", err)
}
}()

select {}
},
}

type streamDiffer struct {
Aevts []*events.XRPCStreamEvent
Bevts []*events.XRPCStreamEvent
}

func (sd *streamDiffer) PushA(evt *events.XRPCStreamEvent) {
ix := findEvt(evt, sd.Bevts)
if ix < 0 {
sd.Aevts = append(sd.Aevts, evt)
return
}

switch evtOp(evt) {
case "#commit":
e := evt.RepoCommit
oe := sd.Bevts[ix].RepoCommit

if len(e.Blocks) != len(oe.Blocks) {
fmt.Printf("seq %d (A) and seq %d (B) have different carslice lengths: %d != %d", e.Seq, oe.Seq, len(e.Blocks), len(oe.Blocks))
}
default:
}

}

func (sd *streamDiffer) PushB(evt *events.XRPCStreamEvent) {

}

func evtOp(evt *events.XRPCStreamEvent) string {
switch {
case evt.Error != nil:
return "ERROR"
case evt.RepoCommit != nil:
return "#commit"
case evt.RepoHandle != nil:
return "#handle"
case evt.RepoInfo != nil:
return "#info"
case evt.RepoMigrate != nil:
return "#migrate"
case evt.RepoTombstone != nil:
return "#tombstone"
default:
return "unknown"
}
}

func sameCommit(a, b *comatproto.SyncSubscribeRepos_Commit) bool {
return a.Repo == b.Repo && cidStr(a.Prev) == cidStr(b.Prev)
}

func findEvt(evt *events.XRPCStreamEvent, list []*events.XRPCStreamEvent) int {
evtop := evtOp(evt)

for i, oe := range list {
if evtop != evtOp(oe) {
continue
}

switch {
case evt.RepoCommit != nil:
if sameCommit(evt.RepoCommit, oe.RepoCommit) {
return i
}
case evt.RepoHandle != nil:
panic("not handling handle updates yet")
case evt.RepoMigrate != nil:
panic("not handling repo migrates yet")
default:
panic("unhandled event type: " + evtop)
}
}

return -1
}
5 changes: 3 additions & 2 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent)
ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent")
defer span.End()

log.Infow("Handling Repo Event!", "uid", evt.User)
log.Debugw("Handling Repo Event!", "uid", evt.User)

var outops []*comatproto.SyncSubscribeRepos_RepoOp
for _, op := range evt.Ops {
Expand Down Expand Up @@ -113,7 +113,7 @@ func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent)

}

log.Infow("Sending event", "did", did)
log.Debugw("Sending event", "did", did)
if err := ix.events.AddEvent(ctx, &events.XRPCStreamEvent{
RepoCommit: &comatproto.SyncSubscribeRepos_Commit{
Repo: did,
Expand Down Expand Up @@ -867,6 +867,7 @@ func (ix *Indexer) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error
}

// TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us
log.Infow("SyncGetRepo", "did", ai.Did, "user", ai.Handle, "from", from)
repo, err := comatproto.SyncGetRepo(ctx, c, ai.Did, from, "")
if err != nil {
return fmt.Errorf("failed to fetch repo: %w", err)
Expand Down

0 comments on commit 8c8248d

Please sign in to comment.