Skip to content

Commit

Permalink
linted events, collector and timesync (spacemeshos#1915)
Browse files Browse the repository at this point in the history
linted some other files as well...
  • Loading branch information
antonlerner committed Mar 23, 2020
1 parent 3286fd3 commit 6f103ef
Show file tree
Hide file tree
Showing 21 changed files with 130 additions and 72 deletions.
2 changes: 1 addition & 1 deletion activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (b *Builder) PublishActivationTx() error {
log.String("commitment", commitStr),
log.Int("atx_size", size),
)
events.Publish(events.AtxCreated{Created: true, Id: atx.ShortString(), Layer: uint64(b.currentEpoch())})
events.Publish(events.AtxCreated{Created: true, ID: atx.ShortString(), Layer: uint64(b.currentEpoch())})

select {
case <-atxReceived:
Expand Down
4 changes: 2 additions & 2 deletions activation/activationdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestATX_ActiveSetForLayerView(t *testing.T) {
hash, err := atx.NIPSTChallenge.Hash()
assert.NoError(t, err)
atx.Nipst = NewNIPSTWithChallenge(hash, poetRef)
//layers.AtxDB.(*AtxDbMock).AddAtx(atx.Id(), atx)
//layers.AtxDB.(*AtxDbMock).AddAtx(atx.ID(), atx)
}
id := atxs[4].Id()
fmt.Println("ID4 ", id.ShortString())
Expand Down Expand Up @@ -664,7 +664,7 @@ func TestActivationDB_ValidateAtxErrors(t *testing.T) {
err = SignAtx(signer, atx)
assert.NoError(t, err)
err = atxdb.SyntacticallyValidateAtx(atx)
assert.EqualError(t, err, fmt.Sprintf("previous ATX belongs to different miner. atx.Id: %v, atx.NodeId: %v, prevAtx.NodeId: %v", atx.ShortString(), atx.NodeId.Key, atxs[0].NodeId.Key))
assert.EqualError(t, err, fmt.Sprintf("previous ATX belongs to different miner. atx.ID: %v, atx.NodeId: %v, prevAtx.NodeId: %v", atx.ShortString(), atx.NodeId.Key, atxs[0].NodeId.Key))

// Wrong layerId.
posAtx2 := types.NewActivationTxForTests(idx2, 0, *types.EmptyAtxId, 1020, 0, *types.EmptyAtxId, coinbase, 3, blocks, npst)
Expand Down
4 changes: 2 additions & 2 deletions activation/atxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (db *ActivationDb) deleteLock(viewHash types.Hash12) {
// - ATX LayerID is NipstLayerTime or less after the PositioningATX LayerID.
// - The ATX view of the previous epoch contains ActiveSetSize activations.
func (db *ActivationDb) SyntacticallyValidateAtx(atx *types.ActivationTx) error {
events.Publish(events.NewAtx{Id: atx.ShortString(), LayerId: uint64(atx.PubLayerIdx.GetEpoch(db.LayersPerEpoch))})
events.Publish(events.NewAtx{ID: atx.ShortString(), LayerID: uint64(atx.PubLayerIdx.GetEpoch(db.LayersPerEpoch))})
pub, err := ExtractPublicKey(atx)
if err != nil {
return fmt.Errorf("cannot validate atx sig atx id %v err %v", atx.ShortString(), err)
Expand All @@ -350,7 +350,7 @@ func (db *ActivationDb) SyntacticallyValidateAtx(atx *types.ActivationTx) error
}

if prevATX.NodeId.Key != atx.NodeId.Key {
return fmt.Errorf("previous ATX belongs to different miner. atx.Id: %v, atx.NodeId: %v, prevAtx.NodeId: %v",
return fmt.Errorf("previous ATX belongs to different miner. atx.ID: %v, atx.NodeId: %v, prevAtx.NodeId: %v",
atx.ShortString(), atx.NodeId.Key, prevATX.NodeId.Key)
}

Expand Down
9 changes: 5 additions & 4 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import (
type eventsCollector struct {
url string
stop chan struct{}
db Db
db DB
}

// NewCollector created a new instance of the collector listening on url for events and writing them to the provided Db
func NewCollector(db Db, url string) *eventsCollector {
// NewCollector created a new instance of the collector listening on url for events and writing them to the provided DB
func NewCollector(db DB, url string) *eventsCollector {
return &eventsCollector{url, make(chan struct{}), db}
}

type Db interface {
// DB defines which events should be stores by any db that the collector uses
type DB interface {
StoreBlock(event *events.NewBlock) error
StoreBlockValid(event *events.ValidBlock) error
StoreTx(event *events.NewTx) error
Expand Down
12 changes: 6 additions & 6 deletions collector/colletor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,23 @@ func TestCollectEvents(t *testing.T) {
c.Start(false)
time.Sleep(2 * time.Second)
for i := 0; i < 10010; i++ {
orig := events.NewBlock{Layer: 1, Id: "234"}
orig := events.NewBlock{Layer: 1, ID: "234"}
err = eventPublisher.PublishEvent(orig)
}

orig1 := events.ValidBlock{Id: "234", Valid: true}
orig1 := events.ValidBlock{ID: "234", Valid: true}
err = eventPublisher.PublishEvent(orig1)

orig2 := events.NewAtx{Id: "1234"}
orig2 := events.NewAtx{ID: "1234"}
err = eventPublisher.PublishEvent(orig2)

orig3 := events.ValidAtx{Id: "1234", Valid: true}
orig3 := events.ValidAtx{ID: "1234", Valid: true}
err = eventPublisher.PublishEvent(orig3)

orig4 := events.NewTx{Id: "4321", Amount: 400, Destination: "1234567", Origin: "876543"}
orig4 := events.NewTx{ID: "4321", Amount: 400, Destination: "1234567", Origin: "876543"}
err = eventPublisher.PublishEvent(orig4)

orig5 := events.ValidTx{Id: "4321", Valid: true}
orig5 := events.ValidTx{ID: "4321", Valid: true}
err = eventPublisher.PublishEvent(orig5)

time.Sleep(1 * time.Second)
Expand Down
35 changes: 27 additions & 8 deletions collector/memory_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"sync"
)

// MemoryCollector is an in memory database to store collected events
type MemoryCollector struct {
events map[events.ChannelId][]Event
events map[events.ChannelID][]Event
doneCreatingBlockEvent map[uint64][]*events.DoneCreatingBlock
doneCreatingAtxEvent map[uint64][]*events.AtxCreated
createdAtxs map[uint64][]string
Expand All @@ -18,9 +19,10 @@ type MemoryCollector struct {
lck sync.RWMutex
}

// NewMemoryCollector initializes a new memory collector that stores collected events
func NewMemoryCollector() *MemoryCollector {
return &MemoryCollector{
events: make(map[events.ChannelId][]Event),
events: make(map[events.ChannelID][]Event),
doneCreatingBlockEvent: make(map[uint64][]*events.DoneCreatingBlock),
doneCreatingAtxEvent: make(map[uint64][]*events.AtxCreated),
gotBlockEvent: make(map[uint64][]*events.NewBlock),
Expand All @@ -30,10 +32,12 @@ func NewMemoryCollector() *MemoryCollector {
}
}

// Event defines global interface for all events to help identify their types, which is channel id
type Event interface {
GetChannel() events.ChannelId
GetChannel() events.ChannelID
}

// StoreBlock stores block received event by layer id
func (c *MemoryCollector) StoreBlock(event *events.NewBlock) error {
c.lck.Lock()
c.events[event.GetChannel()] = append(c.events[event.GetChannel()], event)
Expand All @@ -42,50 +46,57 @@ func (c *MemoryCollector) StoreBlock(event *events.NewBlock) error {
return nil
}

// StoreBlockValid stores block validity event
func (c *MemoryCollector) StoreBlockValid(event *events.ValidBlock) error {
/*c.lck.Lock()
c.events[event.GetChannel()] = append(c.events[event.GetChannel()], event)
c.lck.Unlock()*/
return nil
}

// StoreTx stores tx received event, currently commented out because not used
func (c *MemoryCollector) StoreTx(event *events.NewTx) error {
/*c.lck.Lock()
c.events[event.GetChannel()] = append(c.events[event.GetChannel()], event)
c.lck.Unlock()*/
return nil
}

// StoreTxValid stored tx validity event. currently commented out because not used
func (c *MemoryCollector) StoreTxValid(event *events.ValidTx) error {
/*c.lck.Lock()
c.events[event.GetChannel()] = append(c.events[event.GetChannel()], event)
c.lck.Unlock()*/
return nil
}

// StoreAtx stores created ATX by epoch
func (c *MemoryCollector) StoreAtx(event *events.NewAtx) error {
c.lck.Lock()
c.events[event.GetChannel()] = append(c.events[event.GetChannel()], event)
c.gotAtxEvent[event.LayerId] = append(c.gotAtxEvent[event.LayerId], event)
c.Atxs[event.Id]++
c.gotAtxEvent[event.LayerID] = append(c.gotAtxEvent[event.LayerID], event)
c.Atxs[event.ID]++
c.lck.Unlock()
return nil
}

// StoreAtxValid stores an atx valididty event. not used thus commented out
func (c *MemoryCollector) StoreAtxValid(event *events.ValidAtx) error {
/*c.lck.Lock()
c.events[event.GetChannel()] = append(c.events[event.GetChannel()], event)
c.lck.Unlock()*/
return nil
}

// StoreReward should store reward received but for now is is commented out since not used
func (c *MemoryCollector) StoreReward(event *events.RewardReceived) error {
/*c.lck.Lock()
c.events[event.GetChannel()] = append(c.events[event.GetChannel()], event)
c.lck.Unlock()*/
return nil
}

// StoreBlockCreated stores block created events by layer
func (c *MemoryCollector) StoreBlockCreated(event *events.DoneCreatingBlock) error {
c.lck.Lock()
c.events[event.GetChannel()] = append(c.events[event.GetChannel()], event)
Expand All @@ -94,47 +105,54 @@ func (c *MemoryCollector) StoreBlockCreated(event *events.DoneCreatingBlock) err
return nil
}

// StoreAtxCreated stores atx created events received by epoch
func (c *MemoryCollector) StoreAtxCreated(event *events.AtxCreated) error {
c.lck.Lock()
c.events[event.GetChannel()] = append(c.events[event.GetChannel()], event)
c.doneCreatingAtxEvent[event.Layer] = append(c.doneCreatingAtxEvent[event.Layer], event)
if event.Created {
c.createdAtxs[event.Layer] = append(c.createdAtxs[event.Layer], event.Id)
c.createdAtxs[event.Layer] = append(c.createdAtxs[event.Layer], event.ID)
}
c.lck.Unlock()
return nil
}

func (c *MemoryCollector) GetAtxCreationDone(layer types.EpochId) int {
// GetAtxCreationDone get number of atxs that were crated by this miner events for the provided epoch
func (c *MemoryCollector) GetAtxCreationDone(epoch types.EpochId) int {
c.lck.RLock()
defer c.lck.RUnlock()
return len(c.doneCreatingAtxEvent[uint64(layer)])
return len(c.doneCreatingAtxEvent[uint64(epoch)])
}

// GetCreatedAtx returns the number of created atx events received for the epoch
func (c *MemoryCollector) GetCreatedAtx(epoch types.EpochId) []string {
c.lck.RLock()
defer c.lck.RUnlock()
return c.createdAtxs[uint64(epoch)]
}

// GetNumOfCreatedATXs returns numbe of created atxs per layer
func (c *MemoryCollector) GetNumOfCreatedATXs(layer types.LayerID) int {
c.lck.RLock()
defer c.lck.RUnlock()
return len(c.createdAtxs[uint64(layer)])
}

// GetReceivedATXsNum returns the number of atx received events received for the provided layer
func (c *MemoryCollector) GetReceivedATXsNum(layer types.LayerID) int {
c.lck.RLock()
defer c.lck.RUnlock()
return len(c.gotAtxEvent[uint64(layer)])
}

// GetBlockCreationDone returns number of blocks created events for the given layer
func (c *MemoryCollector) GetBlockCreationDone(layer types.LayerID) int {
c.lck.RLock()
defer c.lck.RUnlock()
return len(c.doneCreatingBlockEvent[uint64(layer)])
}

// GetNumOfCreatedBlocks returns number of blocks created events received by this miner
func (c *MemoryCollector) GetNumOfCreatedBlocks(layer types.LayerID) int {
c.lck.RLock()
defer c.lck.RUnlock()
Expand All @@ -147,6 +165,7 @@ func (c *MemoryCollector) GetNumOfCreatedBlocks(layer types.LayerID) int {
return created
}

// GetReceivedBlocks returns number of blocks received events received by this miner
func (c *MemoryCollector) GetReceivedBlocks(layer types.LayerID) int {
c.lck.RLock()
defer c.lck.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion events/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestNewBlockEvent(t *testing.T) {
s.StartListening()
time.Sleep(5 * time.Second)

orig := NewBlock{Layer: 1, Id: "234"}
orig := NewBlock{Layer: 1, ID: "234"}
err = eventPublisher.PublishEvent(orig)
assert.NoError(t, err)

Expand Down
Loading

0 comments on commit 6f103ef

Please sign in to comment.