From 3b167259902d7fc1b97858be3930d3d70bed39d3 Mon Sep 17 00:00:00 2001 From: Isaac Aymerich Date: Thu, 9 Jan 2025 22:51:55 +0100 Subject: [PATCH] more logs --- server/repository/repository.go | 145 ++++++++++++++++++++------------ server/scheduler/scheduler.go | 22 ++++- 2 files changed, 110 insertions(+), 57 deletions(-) diff --git a/server/repository/repository.go b/server/repository/repository.go index 1b46333..046bd35 100644 --- a/server/repository/repository.go +++ b/server/repository/repository.go @@ -17,9 +17,8 @@ var ( ) type Repository interface { - getConnection(ctx context.Context) (Transaction, error) + getConnection(ctx context.Context) (SQLDBOperations, error) Initialize(ctx context.Context) error - ProcessEvent(ctx context.Context, event *model.TaskEvent) error PingServerUpdate(ctx context.Context, name string, ip string) error GetTimeoutJobs(ctx context.Context, timeout time.Duration) ([]*model.TaskEvent, error) GetJobs(ctx context.Context) (*[]model.Job, error) @@ -34,7 +33,7 @@ type Repository interface { RetrieveQueuedJob(ctx context.Context) (*model.Job, error) } -type Transaction interface { +type SQLDBOperations interface { Exec(query string, args ...interface{}) (sql.Result, error) Prepare(query string) (*sql.Stmt, error) Query(query string, args ...interface{}) (*sql.Rows, error) @@ -48,33 +47,82 @@ type SQLTransaction struct { } func (S *SQLTransaction) Exec(query string, args ...interface{}) (sql.Result, error) { + log.Debugf("Exec: %s, args: %v", query, args) return S.tx.Exec(query, args...) } func (S *SQLTransaction) Prepare(query string) (*sql.Stmt, error) { + log.Debugf("Prepare: %s", query) return S.tx.Prepare(query) } func (S *SQLTransaction) Query(query string, args ...interface{}) (*sql.Rows, error) { + log.Debugf("Query: %s, args: %v", query, args) return S.tx.Query(query, args...) } func (S *SQLTransaction) QueryRow(query string, args ...interface{}) *sql.Row { + log.Debugf("QueryRow: %s, args: %v", query, args) return S.tx.QueryRow(query, args...) } func (S *SQLTransaction) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + log.Debugf("QueryContext: %s, args: %v", query, args) return S.tx.QueryContext(ctx, query, args...) } func (S *SQLTransaction) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + log.Debugf("ExecContext: %s, args: %v", query, args) return S.tx.ExecContext(ctx, query, args...) } +type SQLDatabase struct { + db *sql.DB +} + +func (S *SQLDatabase) Exec(query string, args ...interface{}) (sql.Result, error) { + log.Debugf("Exec query: %s, args: %v", query, args) + return S.db.Exec(query, args...) +} + +func (S *SQLDatabase) Prepare(query string) (*sql.Stmt, error) { + log.Debugf("Prepare: %s", query) + return S.db.Prepare(query) +} + +func (S *SQLDatabase) Query(query string, args ...interface{}) (*sql.Rows, error) { + log.Debugf("Query: %s, args: %v", query, args) + return S.db.Query(query, args...) +} + +func (S *SQLDatabase) QueryRow(query string, args ...interface{}) *sql.Row { + log.Debugf("QueryRow: %s, args: %v", query, args) + return S.db.QueryRow(query, args...) +} + +func (S *SQLDatabase) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + log.Debugf("QueryContext: %s, args: %v", query, args) + return S.db.QueryContext(ctx, query, args...) +} + +func (S *SQLDatabase) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + log.Debugf("ExecContext: %s, args: %v", query, args) + return S.db.ExecContext(ctx, query, args...) +} + +func (S *SQLDatabase) BeginTx(ctx context.Context, s *sql.TxOptions) (*sql.Tx, error) { + log.Debugf("BeginTx") + tx, err := S.db.BeginTx(ctx, s) + if err != nil { + return nil, err + } + return tx, nil +} + type SQLRepository struct { - db *sql.DB - con Transaction + db *SQLDatabase + tx SQLDBOperations } type SQLServerConfig struct { @@ -95,6 +143,7 @@ func NewSQLRepository(config SQLServerConfig) (*SQLRepository, error) { db.SetMaxOpenConns(5) db.SetConnMaxLifetime(0) db.SetMaxIdleConns(5) + /* go func(){ for { fmt.Printf("In use %d not use %d open %d wait %d\n",db.Stats().Idle, db.Stats().InUse, db.Stats().OpenConnections,db.Stats().WaitCount) @@ -102,7 +151,7 @@ func NewSQLRepository(config SQLServerConfig) (*SQLRepository, error) { } }()*/ return &SQLRepository{ - db: db, + db: &SQLDatabase{db}, }, nil } @@ -111,17 +160,6 @@ func (S *SQLRepository) Initialize(ctx context.Context) error { return S.prepareDatabase(ctx) } -func (S *SQLRepository) ProcessEvent(ctx context.Context, taskEvent *model.TaskEvent) error { - switch taskEvent.EventType { - case model.PingEvent: - return S.PingServerUpdate(ctx, taskEvent.WorkerName, taskEvent.IP) - case model.NotificationEvent: - return S.AddNewTaskEvent(ctx, taskEvent) - default: - return fmt.Errorf("unknown event type %s", taskEvent.EventType) - } -} - //go:embed resources/database/*.sql var databaseSchemas embed.FS @@ -134,12 +172,8 @@ func (S *SQLRepository) prepareDatabase(ctx context.Context) error { } // Ensure `schema_version` table exists - const createTableSchemaVersion = ` - CREATE TABLE IF NOT EXISTS schema_version ( - version INT PRIMARY KEY, - applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - ` + const createTableSchemaVersion = `CREATE TABLE IF NOT EXISTS schema_version (version INT PRIMARY KEY, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );` + _, err = con.ExecContext(ctx, createTableSchemaVersion) if err != nil { return fmt.Errorf("failed to ensure schema_version table: %w", err) @@ -205,10 +239,10 @@ func (S *SQLRepository) prepareDatabase(ctx context.Context) error { return nil } -func (S *SQLRepository) getConnection(ctx context.Context) (Transaction, error) { +func (S *SQLRepository) getConnection(ctx context.Context) (SQLDBOperations, error) { //return S.db.Conn(ctx) - if S.con != nil { - return S.con, nil + if S.tx != nil { + return S.tx, nil } return S.db, nil } @@ -220,7 +254,7 @@ func (S *SQLRepository) GetWorker(ctx context.Context, name string) (worker *mod worker, err = S.getWorker(ctx, db, name) return worker, err } -func (S *SQLRepository) getWorker(ctx context.Context, db Transaction, name string) (*model.Worker, error) { +func (S *SQLRepository) getWorker(ctx context.Context, db SQLDBOperations, name string) (*model.Worker, error) { rows, err := db.QueryContext(ctx, "select * from workers where name=$1", name) if err != nil { return nil, err @@ -263,7 +297,7 @@ func (S *SQLRepository) GetJobsByStatus(ctx context.Context, status model.Notifi return S.getJobsByStatus(ctx, conn, status) } -func (S *SQLRepository) getJobsByStatus(ctx context.Context, tx Transaction, statusFilter model.NotificationStatus) ([]*model.Job, error) { +func (S *SQLRepository) getJobsByStatus(ctx context.Context, tx SQLDBOperations, statusFilter model.NotificationStatus) ([]*model.Job, error) { rows, err := tx.QueryContext(ctx, "SELECT v.id, v.source_path,v.source_size, v.target_path,v.target_size,vs.event_time, vs.status, vs.notification_type, vs.message FROM jobs v INNER JOIN job_status vs ON v.id = vs.job_id WHERE vs.status = $1;", statusFilter) if err != nil { return nil, err @@ -302,7 +336,7 @@ func (S *SQLRepository) GetTimeoutJobs(ctx context.Context, timeout time.Duratio return taskEvent, nil } -func (S *SQLRepository) getJob(ctx context.Context, tx Transaction, uuid string) (*model.Job, error) { +func (S *SQLRepository) getJob(ctx context.Context, tx SQLDBOperations, uuid string) (*model.Job, error) { rows, err := tx.QueryContext(ctx, "SELECT id, source_path, source_size, target_path, target_size FROM jobs WHERE id=$1", uuid) if err != nil { return nil, err @@ -344,7 +378,7 @@ func (S *SQLRepository) GetJobs(ctx context.Context) (jobs *[]model.Job, returnE return jobs, err } -func (S *SQLRepository) getJobs(ctx context.Context, tx Transaction) (*[]model.Job, error) { +func (S *SQLRepository) getJobs(ctx context.Context, tx SQLDBOperations) (*[]model.Job, error) { query := fmt.Sprintf(` SELECT v.id, v.source_path,v.source_size, v.target_path,v.target_size, vs.event_time, vs.status, vs.notification_type, vs.message FROM jobs v @@ -366,7 +400,7 @@ func (S *SQLRepository) getJobs(ctx context.Context, tx Transaction) (*[]model.J return &jobs, nil } -func (S *SQLRepository) getTaskEvents(ctx context.Context, tx Transaction, uuid string) ([]*model.TaskEvent, error) { +func (S *SQLRepository) getTaskEvents(ctx context.Context, tx SQLDBOperations, uuid string) ([]*model.TaskEvent, error) { rows, err := tx.QueryContext(ctx, "select * from job_events where job_id=$1 order by event_time asc", uuid) if err != nil { return nil, err @@ -381,7 +415,7 @@ func (S *SQLRepository) getTaskEvents(ctx context.Context, tx Transaction, uuid return taskEvents, nil } -func (S *SQLRepository) getJobStatus(ctx context.Context, tx Transaction, uuid string) (*time.Time, string, model.NotificationType, string, error) { +func (S *SQLRepository) getJobStatus(ctx context.Context, tx SQLDBOperations, uuid string) (*time.Time, string, model.NotificationType, string, error) { var last_update time.Time var status string var statusPhase model.NotificationType @@ -399,7 +433,7 @@ func (S *SQLRepository) getJobStatus(ctx context.Context, tx Transaction, uuid s return &last_update, status, statusPhase, message, nil } -func (S *SQLRepository) getJobByPath(ctx context.Context, tx Transaction, path string) (*model.Job, error) { +func (S *SQLRepository) getJobByPath(ctx context.Context, tx SQLDBOperations, path string) (*model.Job, error) { rows, err := tx.QueryContext(ctx, "select * from jobs where source_path=$1", path) if err != nil { return nil, err @@ -457,7 +491,7 @@ func (S *SQLRepository) AddNewTaskEvent(ctx context.Context, event *model.TaskEv } -func (S *SQLRepository) addNewTaskEvent(ctx context.Context, tx Transaction, event *model.TaskEvent) error { +func (S *SQLRepository) addNewTaskEvent(ctx context.Context, tx SQLDBOperations, event *model.TaskEvent) error { rows, err := tx.QueryContext(ctx, "select max(job_event_id) from job_events where job_id=$1", event.Id.String()) if err != nil { return err @@ -485,7 +519,7 @@ func (S *SQLRepository) AddJob(ctx context.Context, job *model.Job) error { return S.addJob(ctx, conn, job) } -func (S *SQLRepository) addJob(ctx context.Context, tx Transaction, job *model.Job) error { +func (S *SQLRepository) addJob(ctx context.Context, tx SQLDBOperations, job *model.Job) error { _, err := tx.ExecContext(ctx, "INSERT INTO jobs (id, source_path,target_path,source_size,target_size)"+ " VALUES ($1,$2,$3,$4,$5)", job.Id.String(), job.SourcePath, job.TargetPath, job.SourceSize, job.TargetSize) return err @@ -499,12 +533,12 @@ func (S *SQLRepository) UpdateJob(ctx context.Context, job *model.Job) error { return S.updateJob(ctx, conn, job) } -func (S *SQLRepository) updateJob(ctx context.Context, tx Transaction, job *model.Job) error { +func (S *SQLRepository) updateJob(ctx context.Context, tx SQLDBOperations, job *model.Job) error { _, err := tx.ExecContext(ctx, "UPDATE jobs SET source_path=$1, target_path=$2, source_size=$3, target_size=$4 WHERE id=$5", job.SourcePath, job.TargetPath, job.SourceSize, job.TargetSize, job.Id.String()) return err } -func (S *SQLRepository) getTimeoutJobs(ctx context.Context, tx Transaction, timeout time.Duration) ([]*model.TaskEvent, error) { +func (S *SQLRepository) getTimeoutJobs(ctx context.Context, tx SQLDBOperations, timeout time.Duration) ([]*model.TaskEvent, error) { timeoutDate := time.Now().Add(-timeout) timeoutDate.Format(time.RFC3339) @@ -530,26 +564,31 @@ func (S *SQLRepository) WithTransaction(ctx context.Context, transactionFunc fun if err != nil { return err } - txRepository := *S - txRepository.con = &SQLTransaction{ - tx: sqlTx, - } - - err = transactionFunc(ctx, &txRepository) - if err != nil { - if errTx := sqlTx.Rollback(); errTx != nil { - return errTx + defer func() { + if p := recover(); p != nil { + err := sqlTx.Rollback() + if err != nil { + log.Error(p) + } + log.Panic(p) + } else if err != nil { + log.Debugf("Rollback SQLDBOperations %v", sqlTx) + err = sqlTx.Rollback() + if err != nil { + log.Error(err) + } + } else { + if err = sqlTx.Commit(); err != nil { + log.Error(err) + } } - return err - } - if err = sqlTx.Commit(); err != nil { - return err - } + }() - return nil + txRepository := SQLRepository{tx: &SQLTransaction{sqlTx}} + return transactionFunc(ctx, &txRepository) } -func (S *SQLRepository) queuedJob(ctx context.Context, tx Transaction) (*model.Job, error) { +func (S *SQLRepository) queuedJob(ctx context.Context, tx SQLDBOperations) (*model.Job, error) { rows, err := tx.QueryContext(ctx, "select job_id, job_event_id from job_status where notification_type='Job' and status='queued' order by event_time asc limit 1") //2020-05-17 20:50:41.428531 +00:00 diff --git a/server/scheduler/scheduler.go b/server/scheduler/scheduler.go index b515d54..495872d 100644 --- a/server/scheduler/scheduler.go +++ b/server/scheduler/scheduler.go @@ -83,9 +83,9 @@ func (R *RuntimeScheduler) RequestJob(ctx context.Context, workerName string) (* } func (R *RuntimeScheduler) HandleWorkerEvent(ctx context.Context, jobEvent *model.TaskEvent) error { - R.handleEventMu.Lock() - defer R.handleEventMu.Unlock() - if err := R.repo.ProcessEvent(ctx, jobEvent); err != nil { + //R.handleEventMu.Lock() + //defer R.handleEventMu.Unlock() + if err := R.processEvent(ctx, jobEvent); err != nil { return err } @@ -97,6 +97,20 @@ func (R *RuntimeScheduler) HandleWorkerEvent(ctx context.Context, jobEvent *mode return nil } +func (R *RuntimeScheduler) processEvent(ctx context.Context, taskEvent *model.TaskEvent) error { + var err error + switch taskEvent.EventType { + case model.PingEvent: + err = R.repo.PingServerUpdate(ctx, taskEvent.WorkerName, taskEvent.IP) + case model.NotificationEvent: + err = R.repo.AddNewTaskEvent(ctx, taskEvent) + default: + err = fmt.Errorf("unknown event type %s", taskEvent.EventType) + } + + return err +} + func (R *RuntimeScheduler) completeJob(ctx context.Context, jobEvent *model.TaskEvent) error { video, err := R.repo.GetJob(ctx, jobEvent.Id.String()) if err != nil { @@ -419,7 +433,7 @@ func (R *RuntimeScheduler) GetChecksum(ctx context.Context, uuid string) (string return checksum, nil } -func (S *RuntimeScheduler) stop() { +func (R *RuntimeScheduler) stop() { }