diff --git a/chain/gen/gen.go b/chain/gen/gen.go index fe7a76f95bd..ab911a1095c 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -31,7 +31,7 @@ import ( var log = logging.Logger("gen") -const msgsPerBlock = 20 +const msgsPerBlock = 0 type ChainGen struct { accounts []address.Address @@ -104,7 +104,12 @@ func NewGenerator() (*ChainGen, error) { return nil, xerrors.Errorf("creating memrepo wallet failed: %w", err) } - worker, err := w.GenerateKey(types.KTBLS) + worker1, err := w.GenerateKey(types.KTBLS) + if err != nil { + return nil, xerrors.Errorf("failed to generate worker key: %w", err) + } + + worker2, err := w.GenerateKey(types.KTBLS) if err != nil { return nil, xerrors.Errorf("failed to generate worker key: %w", err) } @@ -123,18 +128,20 @@ func NewGenerator() (*ChainGen, error) { } minercfg := &GenMinerCfg{ - Workers: []address.Address{worker, worker}, - Owners: []address.Address{worker, worker}, + Workers: []address.Address{worker1, worker2}, + Owners: []address.Address{worker1, worker2}, PeerIDs: []peer.ID{"peerID1", "peerID2"}, } genb, err := MakeGenesisBlock(bs, map[address.Address]types.BigInt{ - worker: types.FromFil(40000), - banker: types.FromFil(50000), + worker1: types.FromFil(40000), + worker2: types.FromFil(40000), + banker: types.FromFil(50000), }, minercfg, 100000) if err != nil { return nil, xerrors.Errorf("make genesis block failed: %w", err) } + fmt.Println("MINER CFG ADDRESSES: ", minercfg.MinerAddrs) cs := store.NewChainStore(bs, ds) @@ -191,8 +198,7 @@ func (cg *ChainGen) GenesisCar() ([]byte, error) { return out.Bytes(), nil } -func (cg *ChainGen) nextBlockProof(ctx context.Context, m address.Address, ticks []*types.Ticket) (types.ElectionProof, *types.Ticket, error) { - pts := cg.CurTipset.TipSet() +func (cg *ChainGen) nextBlockProof(ctx context.Context, pts *types.TipSet, m address.Address, ticks []*types.Ticket) (types.ElectionProof, *types.Ticket, error) { var lastTicket *types.Ticket if len(ticks) == 0 { @@ -201,7 +207,7 @@ func (cg *ChainGen) nextBlockProof(ctx context.Context, m address.Address, ticks lastTicket = ticks[len(ticks)-1] } - st := cg.CurTipset.TipSet().ParentState() + st := pts.ParentState() worker, err := stmgr.GetMinerWorker(ctx, cg.sm, st, m) if err != nil { @@ -261,7 +267,7 @@ func (cg *ChainGen) NextTipSetFromMiners(base *types.TipSet, miners []address.Ad for len(blks) == 0 { for i, m := range miners { - proof, t, err := cg.nextBlockProof(context.TODO(), m, ticketSets[i]) + proof, t, err := cg.nextBlockProof(context.TODO(), base, m, ticketSets[i]) if err != nil { return nil, xerrors.Errorf("next block proof: %w", err) } diff --git a/chain/gen/utils.go b/chain/gen/utils.go index 75feea3ead5..af42e4c2eba 100644 --- a/chain/gen/utils.go +++ b/chain/gen/utils.go @@ -202,7 +202,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid // TODO: hardcoding 7000000 here is a little fragile, it changes any // time anyone changes the initial account allocations - rval, err := doExecValue(ctx, vm, actors.StorageMarketAddress, owner, types.FromFil(6000), actors.SMAMethods.CreateStorageMiner, params) + rval, err := doExecValue(ctx, vm, actors.StorageMarketAddress, owner, types.FromFil(6500), actors.SMAMethods.CreateStorageMiner, params) if err != nil { return cid.Undef, xerrors.Errorf("failed to create genesis miner: %w", err) } diff --git a/chain/messagepool.go b/chain/messagepool.go index c15af405480..b241a91762f 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -87,7 +87,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { return err } - log.Infof("mpooladd: %d %s", m.Message.Nonce, base64.StdEncoding.EncodeToString(data)) + log.Debugf("mpooladd: %d %s", m.Message.Nonce, base64.StdEncoding.EncodeToString(data)) if err := m.Signature.Verify(m.Message.From, data); err != nil { log.Warnf("mpooladd signature verification failed: %s", err) diff --git a/chain/store/store.go b/chain/store/store.go index d823999691e..eba53709767 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -173,31 +173,30 @@ func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*types.TipSet) erro } func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error { - fts := &FullTipSet{ - Blocks: []*types.FullBlock{ - {Header: b}, - }, + ts, err := types.NewTipSet([]*types.BlockHeader{b}) + if err != nil { + return err } - if err := cs.PutTipSet(fts); err != nil { + if err := cs.PutTipSet(ts); err != nil { return err } return cs.ds.Put(dstore.NewKey("0"), b.Cid().Bytes()) } -func (cs *ChainStore) PutTipSet(ts *FullTipSet) error { - for _, b := range ts.Blocks { - if err := cs.persistBlock(b); err != nil { +func (cs *ChainStore) PutTipSet(ts *types.TipSet) error { + for _, b := range ts.Blocks() { + if err := cs.PersistBlockHeader(b); err != nil { return err } } - expanded, err := cs.expandTipset(ts.TipSet().Blocks()[0]) + expanded, err := cs.expandTipset(ts.Blocks()[0]) if err != nil { return xerrors.Errorf("errored while expanding tipset: %w", err) } - log.Debugf("expanded %s into %s\n", ts.TipSet().Cids(), expanded.Cids()) + log.Debugf("expanded %s into %s\n", ts.Cids(), expanded.Cids()) if err := cs.MaybeTakeHeavierTipSet(expanded); err != nil { return errors.Wrap(err, "MaybeTakeHeavierTipSet failed in PutTipSet") @@ -373,24 +372,6 @@ func (cs *ChainStore) PersistBlockHeader(b *types.BlockHeader) error { return cs.bs.Put(sb) } -func (cs *ChainStore) persistBlock(b *types.FullBlock) error { - if err := cs.PersistBlockHeader(b.Header); err != nil { - return err - } - - for _, m := range b.BlsMessages { - if _, err := cs.PutMessage(m); err != nil { - return err - } - } - for _, m := range b.SecpkMessages { - if _, err := cs.PutMessage(m); err != nil { - return err - } - } - return nil -} - type storable interface { ToStorageBlock() (block.Block, error) } diff --git a/chain/sync.go b/chain/sync.go index e75396c426e..88f60e97d78 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -94,11 +94,19 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { if fts == nil { panic("bad") } + + for _, b := range fts.Blocks { + if err := syncer.validateMsgMeta(b); err != nil { + log.Warnf("invalid block received: %s", err) + return + } + } + if from == syncer.self { // TODO: this is kindof a hack... log.Info("got block from ourselves") - if err := syncer.Sync(fts); err != nil { + if err := syncer.Sync(fts.TipSet()); err != nil { log.Errorf("failed to sync our own block: %+v", err) } @@ -110,12 +118,37 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { syncer.Bsync.AddPeer(from) go func() { - if err := syncer.Sync(fts); err != nil { + if err := syncer.Sync(fts.TipSet()); err != nil { log.Errorf("sync error: %+v", err) } }() } +func (syncer *Syncer) validateMsgMeta(fblk *types.FullBlock) error { + var bcids, scids []cbg.CBORMarshaler + for _, m := range fblk.BlsMessages { + c := cbg.CborCid(m.Cid()) + bcids = append(bcids, &c) + } + + for _, m := range fblk.SecpkMessages { + c := cbg.CborCid(m.Cid()) + scids = append(scids, &c) + } + + bs := amt.WrapBlockstore(syncer.store.Blockstore()) + smroot, err := computeMsgMeta(bs, bcids, scids) + if err != nil { + return xerrors.Errorf("validating msgmeta, compute failed: %w", err) + } + + if fblk.Header.Messages != smroot { + return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot) + } + + return nil +} + func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) { // TODO: search for other blocks that could form a tipset with this block // and then send that tipset to InformNewHead @@ -171,11 +204,6 @@ func zipTipSetAndMessages(bs amt.Blocks, ts *types.TipSet, allbmsgs []*types.Mes smsgCids = append(smsgCids, &c) } - smroot, err := amt.FromArray(bs, smsgCids) - if err != nil { - return nil, err - } - var bmsgs []*types.Message var bmsgCids []cbg.CBORMarshaler for _, m := range bmi[bi] { @@ -184,15 +212,7 @@ func zipTipSetAndMessages(bs amt.Blocks, ts *types.TipSet, allbmsgs []*types.Mes bmsgCids = append(bmsgCids, &c) } - bmroot, err := amt.FromArray(bs, bmsgCids) - if err != nil { - return nil, err - } - - mrcid, err := bs.Put(&types.MsgMeta{ - BlsMessages: bmroot, - SecpkMessages: smroot, - }) + mrcid, err := computeMsgMeta(bs, bmsgCids, smsgCids) if err != nil { return nil, err } @@ -213,6 +233,28 @@ func zipTipSetAndMessages(bs amt.Blocks, ts *types.TipSet, allbmsgs []*types.Mes return fts, nil } +func computeMsgMeta(bs amt.Blocks, bmsgCids, smsgCids []cbg.CBORMarshaler) (cid.Cid, error) { + bmroot, err := amt.FromArray(bs, bmsgCids) + if err != nil { + return cid.Undef, err + } + + smroot, err := amt.FromArray(bs, smsgCids) + if err != nil { + return cid.Undef, err + } + + mrcid, err := bs.Put(&types.MsgMeta{ + BlsMessages: bmroot, + SecpkMessages: smroot, + }) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to put msgmeta: %w", err) + } + + return mrcid, nil +} + func (syncer *Syncer) selectHead(heads map[peer.ID]*types.TipSet) (*types.TipSet, error) { var headsArr []*types.TipSet for _, ts := range heads { @@ -289,12 +331,12 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro return fts, nil } -func (syncer *Syncer) Sync(maybeHead *store.FullTipSet) error { +func (syncer *Syncer) Sync(maybeHead *types.TipSet) error { + syncer.syncLock.Lock() defer syncer.syncLock.Unlock() - ts := maybeHead.TipSet() - if syncer.Genesis.Equals(ts) || syncer.store.GetHeaviestTipSet().Equals(ts) { + if syncer.Genesis.Equals(maybeHead) || syncer.store.GetHeaviestTipSet().Equals(maybeHead) { return nil } @@ -384,7 +426,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err baseTs, err := syncer.store.LoadTipSet(h.Parents) if err != nil { - return xerrors.Errorf("load tipset failed: %w", err) + return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err) } stateroot, precp, err := syncer.sm.TipSetState(baseTs) @@ -476,16 +518,43 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err return nil } + bs := amt.WrapBlockstore(syncer.store.Blockstore()) + var blsCids []cbg.CBORMarshaler for i, m := range b.BlsMessages { if err := checkMsg(m); err != nil { - xerrors.Errorf("block had invalid bls message at index %d: %w", i, err) + return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err) } + c := cbg.CborCid(m.Cid()) + blsCids = append(blsCids, &c) + } + bmroot, err := amt.FromArray(bs, blsCids) + if err != nil { + return xerrors.Errorf("failed to build amt from bls msg cids: %w", err) } + var secpkCids []cbg.CBORMarshaler for i, m := range b.SecpkMessages { if err := checkMsg(&m.Message); err != nil { - xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err) + return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err) } + c := cbg.CborCid(m.Cid()) + secpkCids = append(secpkCids, &c) + } + smroot, err := amt.FromArray(bs, secpkCids) + if err != nil { + return xerrors.Errorf("failed to build amt from bls msg cids: %w", err) + } + + mrcid, err := bs.Put(&types.MsgMeta{ + BlsMessages: bmroot, + SecpkMessages: smroot, + }) + if err != nil { + return err + } + + if h.Messages != mrcid { + return fmt.Errorf("messages didnt match message root in header") } return nil @@ -592,6 +661,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu return err } if fts == nil { + fmt.Println("Failed to fill tipset for: ", beg, headers[beg].Cids(), headers[beg].Height()) break } if err := cb(fts); err != nil { @@ -671,10 +741,10 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error { return nil } -func (syncer *Syncer) collectChain(fts *store.FullTipSet) error { - syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), fts.TipSet()) +func (syncer *Syncer) collectChain(ts *types.TipSet) error { + syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts) - headers, err := syncer.collectHeaders(fts.TipSet(), syncer.store.GetHeaviestTipSet()) + headers, err := syncer.collectHeaders(ts, syncer.store.GetHeaviestTipSet()) if err != nil { return err } diff --git a/chain/sync_test.go b/chain/sync_test.go index 51f9e6af905..00cb12d9371 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/peer" @@ -30,6 +31,8 @@ func (tu *syncTestUtil) repoWithChain(t testing.TB, h int) (repo.Repo, []byte, [ mts, err := tu.g.NextTipSet() require.NoError(t, err) + fmt.Println("REPO WITH CHAIN NEXT TIPSET: ", mts.TipSet.TipSet().Height()) + blks[i] = mts.TipSet ts := mts.TipSet.TipSet() @@ -93,6 +96,45 @@ func (tu *syncTestUtil) Shutdown() { tu.cancel() } +func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet) { + // TODO: would be great if we could pass a whole tipset here... + for _, fb := range fts.Blocks { + var b types.BlockMsg + + // -1 to match block.Height + b.Header = fb.Header + for _, msg := range fb.SecpkMessages { + c, err := tu.nds[to].(*impl.FullNodeAPI).ChainAPI.Chain.PutMessage(msg) + require.NoError(tu.t, err) + + b.SecpkMessages = append(b.SecpkMessages, c) + } + + for _, msg := range fb.BlsMessages { + c, err := tu.nds[to].(*impl.FullNodeAPI).ChainAPI.Chain.PutMessage(msg) + require.NoError(tu.t, err) + + b.BlsMessages = append(b.BlsMessages, c) + } + + require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b)) + + } + + start := time.Now() + h, err := tu.nds[to].ChainHead(tu.ctx) + require.NoError(tu.t, err) + for !h.Equals(fts.TipSet()) { + time.Sleep(time.Millisecond * 50) + h, err = tu.nds[to].ChainHead(tu.ctx) + require.NoError(tu.t, err) + + if time.Since(start) > time.Second*10 { + tu.t.Fatal("took too long waiting for block to be accepted") + } + } +} + func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int) *store.FullTipSet { if miners == nil { for i := range tu.g.Miners { @@ -101,20 +143,16 @@ func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int } var maddrs []address.Address - for i := range miners { + for _, i := range miners { maddrs = append(maddrs, tu.g.Miners[i]) } + fmt.Println("Miner mining block: ", maddrs) + mts, err := tu.g.NextTipSetFromMiners(blk.TipSet(), maddrs) require.NoError(tu.t, err) - for _, msg := range mts.Messages { - require.NoError(tu.t, tu.nds[src].MpoolPush(context.TODO(), msg)) - } - - for _, fblk := range mts.TipSet.Blocks { - require.NoError(tu.t, tu.nds[src].ChainSubmitBlock(context.TODO(), fblkToBlkMsg(fblk))) - } + tu.pushFtsAndWait(src, mts.TipSet) return mts.TipSet } @@ -311,6 +349,70 @@ func TestSyncMining(t *testing.T) { } } +func (tu *syncTestUtil) loadChainToNode(to int) { + // utility to simulate incoming blocks without miner process + // TODO: should call syncer directly, this won't work correctly in all cases + + for i := 0; i < len(tu.blocks); i++ { + tu.pushFtsAndWait(to, tu.blocks[i]) + } +} + +func TestSyncFork(t *testing.T) { + H := 10 + tu := prepSyncTest(t, H) + + p1 := tu.addClientNode() + p2 := tu.addClientNode() + + fmt.Println("GENESIS: ", tu.g.Genesis().Cid()) + tu.loadChainToNode(p1) + tu.loadChainToNode(p2) + + phead := func() { + h1, err := tu.nds[1].ChainHead(tu.ctx) + require.NoError(tu.t, err) + + h2, err := tu.nds[2].ChainHead(tu.ctx) + require.NoError(tu.t, err) + + fmt.Println("Node 1: ", h1.Cids(), h1.Parents(), h1.Height()) + fmt.Println("Node 2: ", h2.Cids(), h1.Parents(), h2.Height()) + //time.Sleep(time.Second * 2) + fmt.Println() + fmt.Println() + fmt.Println() + fmt.Println() + } + + phead() + + base := tu.g.CurTipset + fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height()) + a := tu.mineOnBlock(base, p1, []int{0}) + b := tu.mineOnBlock(base, p2, []int{1}) + phead() + + a = tu.mineOnBlock(a, p1, []int{0}) + b = tu.mineOnBlock(b, p2, []int{1}) + phead() + + a = tu.mineOnBlock(a, p1, []int{0}) + b = tu.mineOnBlock(b, p2, []int{1}) + phead() + + fmt.Println("A: ", a.Cids(), a.TipSet().Height()) + fmt.Println("B: ", b.Cids(), b.TipSet().Height()) + + // Now for the fun part!! + + require.NoError(t, tu.mn.LinkAll()) + tu.connect(p1, p2) + tu.waitUntilSync(p1, p2) + + phead() +} + func BenchmarkSyncBasic(b *testing.B) { for i := 0; i < b.N; i++ { runSyncBenchLength(b, 100) diff --git a/chain/types/tipset.go b/chain/types/tipset.go index 688edacd4c4..0dc877ec536 100644 --- a/chain/types/tipset.go +++ b/chain/types/tipset.go @@ -56,7 +56,7 @@ func tipsetSortFunc(blks []*BlockHeader) func(i, j int) bool { tj := blks[j].LastTicket() if ti.Equals(tj) { - log.Warn("blocks have same ticket") + //log.Warn("blocks have same ticket") return blks[i].Cid().KeyString() < blks[j].Cid().KeyString() }