Skip to content

Commit

Permalink
Feature: Faster start & stop times (thrasher-corp#648)
Browse files Browse the repository at this point in the history
* Updates starting and stopping routines to be a bit more parallel with less waiting required

* Removes stop, removes debugging output

* linting and test fixes

* Add extra kill switch for exiting on exchange loading delay

* Fixes fun math

* breaks loop instead of switch. Moves param warns higher

* Removes unceccary gos. passes in cfg to remove data race

* Removes os signal processing. Fixes bad master merge
  • Loading branch information
gloriousCode authored Mar 22, 2021
1 parent 46f7195 commit 3c72a19
Show file tree
Hide file tree
Showing 18 changed files with 209 additions and 200 deletions.
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,10 @@ func (c *Config) SaveConfigToFile(configPath string) error {
}
defer func() {
if writer != nil {
writer.Close()
err = writer.Close()
if err != nil {
log.Error(log.Global, err)
}
}
}()
return c.Save(provider, func() ([]byte, error) { return PromptForConfigKey(true) })
Expand Down
2 changes: 1 addition & 1 deletion connchecker/connchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *Checker) Monitor(wg *sync.WaitGroup) {
for {
select {
case <-tick.C:
c.connectionTest()
go c.connectionTest()
case <-c.shutdown:
return
}
Expand Down
20 changes: 12 additions & 8 deletions currency/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,20 @@ func (s *Storage) ForeignExchangeUpdater() {
return

case <-SeedForeignExchangeTick.C:
err := s.SeedForeignExchangeRates()
if err != nil {
log.Errorln(log.Global, err)
}
go func() {
err := s.SeedForeignExchangeRates()
if err != nil {
log.Errorln(log.Global, err)
}
}()

case <-SeedCurrencyAnalysisTick.C:
err := s.SeedCurrencyAnalysisData()
if err != nil {
log.Errorln(log.Global, err)
}
go func() {
err := s.SeedCurrencyAnalysisData()
if err != nil {
log.Errorln(log.Global, err)
}
}()
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/gofrs/uuid"
"github.com/thrasher-corp/gocryptotrader/engine/subsystem"
"github.com/thrasher-corp/gocryptotrader/log"
)

Expand Down Expand Up @@ -79,7 +80,7 @@ func SpawnWorker() error {
// configuration, then spawns workers
func (d *Dispatcher) start(workers, channelCapacity int) error {
if atomic.LoadUint32(&d.running) == 1 {
return errors.New(errAlreadyStarted)
return fmt.Errorf("dispatcher %w", subsystem.ErrSubSystemAlreadyStarted)
}

if workers < 1 {
Expand Down Expand Up @@ -114,7 +115,7 @@ func (d *Dispatcher) start(workers, channelCapacity int) error {
// stop stops the service and shuts down all worker routines
func (d *Dispatcher) stop() error {
if !atomic.CompareAndSwapUint32(&d.running, 1, 0) {
return errors.New(errCannotShutdown)
return fmt.Errorf("dispatcher %w", subsystem.ErrSubSystemNotStarted)
}
close(d.shutdown)
ch := make(chan struct{})
Expand Down Expand Up @@ -176,7 +177,7 @@ func (d *Dispatcher) spawnWorker() error {
return nil
}

// Relayer routine relays communications across the defined routes
// relayer routine relays communications across the defined routes
func (d *Dispatcher) relayer(i *sync.WaitGroup) {
atomic.AddInt32(&d.count, 1)
d.wg.Add(1)
Expand Down
2 changes: 0 additions & 2 deletions dispatch/dispatch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ const (
DefaultHandshakeTimeout = 200 * time.Nanosecond

errNotInitialised = "dispatcher not initialised"
errAlreadyStarted = "dispatcher already started"
errCannotShutdown = "dispatcher cannot shutdown, already stopped"
errShutdownRoutines = "dispatcher did not shutdown properly, routines failed to close"
)

Expand Down
19 changes: 8 additions & 11 deletions engine/comms_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package engine

import (
"errors"
"fmt"
"sync/atomic"

"github.com/thrasher-corp/gocryptotrader/communications"
"github.com/thrasher-corp/gocryptotrader/communications/base"
"github.com/thrasher-corp/gocryptotrader/engine/subsystem"
"github.com/thrasher-corp/gocryptotrader/log"
)

// commsManager starts the NTP manager
type commsManager struct {
started int32
stopped int32
shutdown chan struct{}
relayMsg chan base.Event
comms *communications.Communications
Expand All @@ -23,8 +24,8 @@ func (c *commsManager) Started() bool {
}

func (c *commsManager) Start() (err error) {
if atomic.AddInt32(&c.started, 1) != 1 {
return errors.New("communications manager already started")
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return fmt.Errorf("communications manager %w", subsystem.ErrSubSystemAlreadyStarted)
}

defer func() {
Expand Down Expand Up @@ -56,13 +57,11 @@ func (c *commsManager) GetStatus() (map[string]base.CommsStatus, error) {

func (c *commsManager) Stop() error {
if atomic.LoadInt32(&c.started) == 0 {
return errors.New("communications manager not started")
return fmt.Errorf("communications manager %w", subsystem.ErrSubSystemNotStarted)
}

if atomic.AddInt32(&c.stopped, 1) != 1 {
return errors.New("communications manager is already stopped")
}

defer func() {
atomic.CompareAndSwapInt32(&c.started, 1, 0)
}()
close(c.shutdown)
log.Debugln(log.CommunicationMgr, "Communications manager shutting down...")
return nil
Expand All @@ -82,8 +81,6 @@ func (c *commsManager) PushEvent(evt base.Event) {
func (c *commsManager) run() {
defer func() {
// TO-DO shutdown comms connections for connected services (Slack etc)
atomic.CompareAndSwapInt32(&c.stopped, 1, 0)
atomic.CompareAndSwapInt32(&c.started, 1, 0)
log.Debugln(log.CommunicationMgr, "Communications manager shutdown.")
}()

Expand Down
19 changes: 8 additions & 11 deletions engine/connection.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package engine

import (
"errors"
"fmt"
"sync/atomic"

"github.com/thrasher-corp/gocryptotrader/config"
"github.com/thrasher-corp/gocryptotrader/connchecker"
"github.com/thrasher-corp/gocryptotrader/engine/subsystem"
"github.com/thrasher-corp/gocryptotrader/log"
)

// connectionManager manages the connchecker
type connectionManager struct {
started int32
stopped int32
conn *connchecker.Checker
}

Expand All @@ -23,8 +23,8 @@ func (c *connectionManager) Started() bool {

// Start starts an instance of the connection manager
func (c *connectionManager) Start(conf *config.ConnectionMonitorConfig) error {
if atomic.AddInt32(&c.started, 1) != 1 {
return errors.New("connection manager already started")
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return fmt.Errorf("connection manager %w", subsystem.ErrSubSystemAlreadyStarted)
}

log.Debugln(log.ConnectionMgr, "Connection manager starting...")
Expand All @@ -44,17 +44,14 @@ func (c *connectionManager) Start(conf *config.ConnectionMonitorConfig) error {
// Stop stops the connection manager
func (c *connectionManager) Stop() error {
if atomic.LoadInt32(&c.started) == 0 {
return errors.New("connection manager not started")
}

if atomic.AddInt32(&c.stopped, 1) != 1 {
return errors.New("connection manager is already stopped")
return fmt.Errorf("connection manager %w", subsystem.ErrSubSystemNotStarted)
}
defer func() {
atomic.CompareAndSwapInt32(&c.started, 1, 0)
}()

log.Debugln(log.ConnectionMgr, "Connection manager shutting down...")
c.conn.Shutdown()
atomic.CompareAndSwapInt32(&c.stopped, 1, 0)
atomic.CompareAndSwapInt32(&c.started, 1, 0)
log.Debugln(log.ConnectionMgr, "Connection manager stopped.")
return nil
}
Expand Down
22 changes: 8 additions & 14 deletions engine/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/database"
dbpsql "github.com/thrasher-corp/gocryptotrader/database/drivers/postgres"
dbsqlite3 "github.com/thrasher-corp/gocryptotrader/database/drivers/sqlite3"
"github.com/thrasher-corp/gocryptotrader/engine/subsystem"
"github.com/thrasher-corp/gocryptotrader/log"
"github.com/thrasher-corp/sqlboiler/boil"
)
Expand All @@ -19,7 +20,6 @@ var (

type databaseManager struct {
started int32
stopped int32
shutdown chan struct{}
}

Expand All @@ -28,8 +28,8 @@ func (a *databaseManager) Started() bool {
}

func (a *databaseManager) Start(bot *Engine) (err error) {
if atomic.AddInt32(&a.started, 1) != 1 {
return errors.New("database manager already started")
if !atomic.CompareAndSwapInt32(&a.started, 0, 1) {
return fmt.Errorf("database manager %w", subsystem.ErrSubSystemAlreadyStarted)
}

defer func() {
Expand Down Expand Up @@ -78,12 +78,11 @@ func (a *databaseManager) Start(bot *Engine) (err error) {

func (a *databaseManager) Stop() error {
if atomic.LoadInt32(&a.started) == 0 {
return errors.New("database manager not started")
}

if atomic.AddInt32(&a.stopped, 1) != 1 {
return errors.New("database manager is already stopping")
return fmt.Errorf("database manager %w", subsystem.ErrSubSystemNotStarted)
}
defer func() {
atomic.CompareAndSwapInt32(&a.started, 1, 0)
}()

err := dbConn.SQL.Close()
if err != nil {
Expand All @@ -97,16 +96,11 @@ func (a *databaseManager) Stop() error {
func (a *databaseManager) run(bot *Engine) {
log.Debugln(log.DatabaseMgr, "Database manager started.")
bot.ServicesWG.Add(1)

t := time.NewTicker(time.Second * 2)

defer func() {
t.Stop()
atomic.CompareAndSwapInt32(&a.stopped, 1, 0)
atomic.CompareAndSwapInt32(&a.started, 1, 0)

bot.ServicesWG.Done()

log.Debugln(log.DatabaseMgr, "Database manager shutdown.")
}()

Expand All @@ -115,7 +109,7 @@ func (a *databaseManager) run(bot *Engine) {
case <-a.shutdown:
return
case <-t.C:
a.checkConnection()
go a.checkConnection()
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,17 +396,16 @@ func (bot *Engine) Start() error {
}

gctlog.Debugln(gctlog.Global, "Setting up exchanges..")
bot.SetupExchanges()
if bot.exchangeManager.Len() == 0 {
return errors.New("no exchanges are loaded")
err := bot.SetupExchanges()
if err != nil {
return err
}

if bot.Settings.EnableCommsRelayer {
if err := bot.CommsManager.Start(); err != nil {
if err = bot.CommsManager.Start(); err != nil {
gctlog.Errorf(gctlog.Global, "Communications manager unable to start: %v\n", err)
}
}
var err error
if bot.Settings.EnableCoinmarketcapAnalysis ||
bot.Settings.EnableCurrencyConverter ||
bot.Settings.EnableCurrencyLayer ||
Expand Down Expand Up @@ -556,6 +555,7 @@ func (bot *Engine) Stop() {
gctlog.Errorf(gctlog.DispatchMgr, "Dispatch system unable to stop. Error: %v", err)
}
}

if bot.Settings.EnableCoinmarketcapAnalysis ||
bot.Settings.EnableCurrencyConverter ||
bot.Settings.EnableCurrencyLayer ||
Expand Down
Loading

0 comments on commit 3c72a19

Please sign in to comment.