Skip to content

Commit

Permalink
more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
iaymerich committed Jan 9, 2025
1 parent 8ce8c7d commit 3b16725
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 57 deletions.
145 changes: 92 additions & 53 deletions server/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ var (
)

type Repository interface {
getConnection(ctx context.Context) (Transaction, error)
getConnection(ctx context.Context) (SQLDBOperations, error)
Initialize(ctx context.Context) error
ProcessEvent(ctx context.Context, event *model.TaskEvent) error
PingServerUpdate(ctx context.Context, name string, ip string) error
GetTimeoutJobs(ctx context.Context, timeout time.Duration) ([]*model.TaskEvent, error)
GetJobs(ctx context.Context) (*[]model.Job, error)
Expand All @@ -34,7 +33,7 @@ type Repository interface {
RetrieveQueuedJob(ctx context.Context) (*model.Job, error)
}

type Transaction interface {
type SQLDBOperations interface {
Exec(query string, args ...interface{}) (sql.Result, error)
Prepare(query string) (*sql.Stmt, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
Expand All @@ -48,33 +47,82 @@ type SQLTransaction struct {
}

func (S *SQLTransaction) Exec(query string, args ...interface{}) (sql.Result, error) {
log.Debugf("Exec: %s, args: %v", query, args)
return S.tx.Exec(query, args...)

}

func (S *SQLTransaction) Prepare(query string) (*sql.Stmt, error) {
log.Debugf("Prepare: %s", query)
return S.tx.Prepare(query)
}

func (S *SQLTransaction) Query(query string, args ...interface{}) (*sql.Rows, error) {
log.Debugf("Query: %s, args: %v", query, args)
return S.tx.Query(query, args...)
}

func (S *SQLTransaction) QueryRow(query string, args ...interface{}) *sql.Row {
log.Debugf("QueryRow: %s, args: %v", query, args)
return S.tx.QueryRow(query, args...)
}

func (S *SQLTransaction) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
log.Debugf("QueryContext: %s, args: %v", query, args)
return S.tx.QueryContext(ctx, query, args...)
}

func (S *SQLTransaction) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
log.Debugf("ExecContext: %s, args: %v", query, args)
return S.tx.ExecContext(ctx, query, args...)
}

type SQLDatabase struct {
db *sql.DB
}

func (S *SQLDatabase) Exec(query string, args ...interface{}) (sql.Result, error) {
log.Debugf("Exec query: %s, args: %v", query, args)
return S.db.Exec(query, args...)
}

func (S *SQLDatabase) Prepare(query string) (*sql.Stmt, error) {
log.Debugf("Prepare: %s", query)
return S.db.Prepare(query)
}

func (S *SQLDatabase) Query(query string, args ...interface{}) (*sql.Rows, error) {
log.Debugf("Query: %s, args: %v", query, args)
return S.db.Query(query, args...)
}

func (S *SQLDatabase) QueryRow(query string, args ...interface{}) *sql.Row {
log.Debugf("QueryRow: %s, args: %v", query, args)
return S.db.QueryRow(query, args...)
}

func (S *SQLDatabase) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
log.Debugf("QueryContext: %s, args: %v", query, args)
return S.db.QueryContext(ctx, query, args...)
}

func (S *SQLDatabase) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
log.Debugf("ExecContext: %s, args: %v", query, args)
return S.db.ExecContext(ctx, query, args...)
}

func (S *SQLDatabase) BeginTx(ctx context.Context, s *sql.TxOptions) (*sql.Tx, error) {
log.Debugf("BeginTx")
tx, err := S.db.BeginTx(ctx, s)
if err != nil {
return nil, err
}
return tx, nil
}

type SQLRepository struct {
db *sql.DB
con Transaction
db *SQLDatabase
tx SQLDBOperations
}

type SQLServerConfig struct {
Expand All @@ -95,14 +143,15 @@ func NewSQLRepository(config SQLServerConfig) (*SQLRepository, error) {
db.SetMaxOpenConns(5)
db.SetConnMaxLifetime(0)
db.SetMaxIdleConns(5)

/* go func(){
for {
fmt.Printf("In use %d not use %d open %d wait %d\n",db.Stats().Idle, db.Stats().InUse, db.Stats().OpenConnections,db.Stats().WaitCount)
time.Sleep(time.Second*5)
}
}()*/
return &SQLRepository{
db: db,
db: &SQLDatabase{db},
}, nil

}
Expand All @@ -111,17 +160,6 @@ func (S *SQLRepository) Initialize(ctx context.Context) error {
return S.prepareDatabase(ctx)
}

func (S *SQLRepository) ProcessEvent(ctx context.Context, taskEvent *model.TaskEvent) error {
switch taskEvent.EventType {
case model.PingEvent:
return S.PingServerUpdate(ctx, taskEvent.WorkerName, taskEvent.IP)
case model.NotificationEvent:
return S.AddNewTaskEvent(ctx, taskEvent)
default:
return fmt.Errorf("unknown event type %s", taskEvent.EventType)
}
}

//go:embed resources/database/*.sql
var databaseSchemas embed.FS

Expand All @@ -134,12 +172,8 @@ func (S *SQLRepository) prepareDatabase(ctx context.Context) error {
}

// Ensure `schema_version` table exists
const createTableSchemaVersion = `
CREATE TABLE IF NOT EXISTS schema_version (
version INT PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`
const createTableSchemaVersion = `CREATE TABLE IF NOT EXISTS schema_version (version INT PRIMARY KEY, applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );`

_, err = con.ExecContext(ctx, createTableSchemaVersion)
if err != nil {
return fmt.Errorf("failed to ensure schema_version table: %w", err)
Expand Down Expand Up @@ -205,10 +239,10 @@ func (S *SQLRepository) prepareDatabase(ctx context.Context) error {
return nil
}

func (S *SQLRepository) getConnection(ctx context.Context) (Transaction, error) {
func (S *SQLRepository) getConnection(ctx context.Context) (SQLDBOperations, error) {
//return S.db.Conn(ctx)
if S.con != nil {
return S.con, nil
if S.tx != nil {
return S.tx, nil
}
return S.db, nil
}
Expand All @@ -220,7 +254,7 @@ func (S *SQLRepository) GetWorker(ctx context.Context, name string) (worker *mod
worker, err = S.getWorker(ctx, db, name)
return worker, err
}
func (S *SQLRepository) getWorker(ctx context.Context, db Transaction, name string) (*model.Worker, error) {
func (S *SQLRepository) getWorker(ctx context.Context, db SQLDBOperations, name string) (*model.Worker, error) {
rows, err := db.QueryContext(ctx, "select * from workers where name=$1", name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -263,7 +297,7 @@ func (S *SQLRepository) GetJobsByStatus(ctx context.Context, status model.Notifi
return S.getJobsByStatus(ctx, conn, status)
}

func (S *SQLRepository) getJobsByStatus(ctx context.Context, tx Transaction, statusFilter model.NotificationStatus) ([]*model.Job, error) {
func (S *SQLRepository) getJobsByStatus(ctx context.Context, tx SQLDBOperations, statusFilter model.NotificationStatus) ([]*model.Job, error) {
rows, err := tx.QueryContext(ctx, "SELECT v.id, v.source_path,v.source_size, v.target_path,v.target_size,vs.event_time, vs.status, vs.notification_type, vs.message FROM jobs v INNER JOIN job_status vs ON v.id = vs.job_id WHERE vs.status = $1;", statusFilter)
if err != nil {
return nil, err
Expand Down Expand Up @@ -302,7 +336,7 @@ func (S *SQLRepository) GetTimeoutJobs(ctx context.Context, timeout time.Duratio
return taskEvent, nil
}

func (S *SQLRepository) getJob(ctx context.Context, tx Transaction, uuid string) (*model.Job, error) {
func (S *SQLRepository) getJob(ctx context.Context, tx SQLDBOperations, uuid string) (*model.Job, error) {
rows, err := tx.QueryContext(ctx, "SELECT id, source_path, source_size, target_path, target_size FROM jobs WHERE id=$1", uuid)
if err != nil {
return nil, err
Expand Down Expand Up @@ -344,7 +378,7 @@ func (S *SQLRepository) GetJobs(ctx context.Context) (jobs *[]model.Job, returnE
return jobs, err
}

func (S *SQLRepository) getJobs(ctx context.Context, tx Transaction) (*[]model.Job, error) {
func (S *SQLRepository) getJobs(ctx context.Context, tx SQLDBOperations) (*[]model.Job, error) {
query := fmt.Sprintf(`
SELECT v.id, v.source_path,v.source_size, v.target_path,v.target_size, vs.event_time, vs.status, vs.notification_type, vs.message
FROM jobs v
Expand All @@ -366,7 +400,7 @@ func (S *SQLRepository) getJobs(ctx context.Context, tx Transaction) (*[]model.J
return &jobs, nil
}

func (S *SQLRepository) getTaskEvents(ctx context.Context, tx Transaction, uuid string) ([]*model.TaskEvent, error) {
func (S *SQLRepository) getTaskEvents(ctx context.Context, tx SQLDBOperations, uuid string) ([]*model.TaskEvent, error) {
rows, err := tx.QueryContext(ctx, "select * from job_events where job_id=$1 order by event_time asc", uuid)
if err != nil {
return nil, err
Expand All @@ -381,7 +415,7 @@ func (S *SQLRepository) getTaskEvents(ctx context.Context, tx Transaction, uuid
return taskEvents, nil
}

func (S *SQLRepository) getJobStatus(ctx context.Context, tx Transaction, uuid string) (*time.Time, string, model.NotificationType, string, error) {
func (S *SQLRepository) getJobStatus(ctx context.Context, tx SQLDBOperations, uuid string) (*time.Time, string, model.NotificationType, string, error) {
var last_update time.Time
var status string
var statusPhase model.NotificationType
Expand All @@ -399,7 +433,7 @@ func (S *SQLRepository) getJobStatus(ctx context.Context, tx Transaction, uuid s
return &last_update, status, statusPhase, message, nil
}

func (S *SQLRepository) getJobByPath(ctx context.Context, tx Transaction, path string) (*model.Job, error) {
func (S *SQLRepository) getJobByPath(ctx context.Context, tx SQLDBOperations, path string) (*model.Job, error) {
rows, err := tx.QueryContext(ctx, "select * from jobs where source_path=$1", path)
if err != nil {
return nil, err
Expand Down Expand Up @@ -457,7 +491,7 @@ func (S *SQLRepository) AddNewTaskEvent(ctx context.Context, event *model.TaskEv

}

func (S *SQLRepository) addNewTaskEvent(ctx context.Context, tx Transaction, event *model.TaskEvent) error {
func (S *SQLRepository) addNewTaskEvent(ctx context.Context, tx SQLDBOperations, event *model.TaskEvent) error {
rows, err := tx.QueryContext(ctx, "select max(job_event_id) from job_events where job_id=$1", event.Id.String())
if err != nil {
return err
Expand Down Expand Up @@ -485,7 +519,7 @@ func (S *SQLRepository) AddJob(ctx context.Context, job *model.Job) error {
return S.addJob(ctx, conn, job)
}

func (S *SQLRepository) addJob(ctx context.Context, tx Transaction, job *model.Job) error {
func (S *SQLRepository) addJob(ctx context.Context, tx SQLDBOperations, job *model.Job) error {
_, err := tx.ExecContext(ctx, "INSERT INTO jobs (id, source_path,target_path,source_size,target_size)"+
" VALUES ($1,$2,$3,$4,$5)", job.Id.String(), job.SourcePath, job.TargetPath, job.SourceSize, job.TargetSize)
return err
Expand All @@ -499,12 +533,12 @@ func (S *SQLRepository) UpdateJob(ctx context.Context, job *model.Job) error {
return S.updateJob(ctx, conn, job)
}

func (S *SQLRepository) updateJob(ctx context.Context, tx Transaction, job *model.Job) error {
func (S *SQLRepository) updateJob(ctx context.Context, tx SQLDBOperations, job *model.Job) error {
_, err := tx.ExecContext(ctx, "UPDATE jobs SET source_path=$1, target_path=$2, source_size=$3, target_size=$4 WHERE id=$5", job.SourcePath, job.TargetPath, job.SourceSize, job.TargetSize, job.Id.String())
return err
}

func (S *SQLRepository) getTimeoutJobs(ctx context.Context, tx Transaction, timeout time.Duration) ([]*model.TaskEvent, error) {
func (S *SQLRepository) getTimeoutJobs(ctx context.Context, tx SQLDBOperations, timeout time.Duration) ([]*model.TaskEvent, error) {
timeoutDate := time.Now().Add(-timeout)
timeoutDate.Format(time.RFC3339)

Expand All @@ -530,26 +564,31 @@ func (S *SQLRepository) WithTransaction(ctx context.Context, transactionFunc fun
if err != nil {
return err
}
txRepository := *S
txRepository.con = &SQLTransaction{
tx: sqlTx,
}

err = transactionFunc(ctx, &txRepository)
if err != nil {
if errTx := sqlTx.Rollback(); errTx != nil {
return errTx
defer func() {
if p := recover(); p != nil {
err := sqlTx.Rollback()
if err != nil {
log.Error(p)
}
log.Panic(p)
} else if err != nil {
log.Debugf("Rollback SQLDBOperations %v", sqlTx)
err = sqlTx.Rollback()
if err != nil {
log.Error(err)
}
} else {
if err = sqlTx.Commit(); err != nil {
log.Error(err)
}
}
return err
}
if err = sqlTx.Commit(); err != nil {
return err
}
}()

return nil
txRepository := SQLRepository{tx: &SQLTransaction{sqlTx}}
return transactionFunc(ctx, &txRepository)
}

func (S *SQLRepository) queuedJob(ctx context.Context, tx Transaction) (*model.Job, error) {
func (S *SQLRepository) queuedJob(ctx context.Context, tx SQLDBOperations) (*model.Job, error) {
rows, err := tx.QueryContext(ctx, "select job_id, job_event_id from job_status where notification_type='Job' and status='queued' order by event_time asc limit 1")

//2020-05-17 20:50:41.428531 +00:00
Expand Down
22 changes: 18 additions & 4 deletions server/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ func (R *RuntimeScheduler) RequestJob(ctx context.Context, workerName string) (*
}

func (R *RuntimeScheduler) HandleWorkerEvent(ctx context.Context, jobEvent *model.TaskEvent) error {
R.handleEventMu.Lock()
defer R.handleEventMu.Unlock()
if err := R.repo.ProcessEvent(ctx, jobEvent); err != nil {
//R.handleEventMu.Lock()
//defer R.handleEventMu.Unlock()
if err := R.processEvent(ctx, jobEvent); err != nil {
return err
}

Expand All @@ -97,6 +97,20 @@ func (R *RuntimeScheduler) HandleWorkerEvent(ctx context.Context, jobEvent *mode
return nil
}

func (R *RuntimeScheduler) processEvent(ctx context.Context, taskEvent *model.TaskEvent) error {
var err error
switch taskEvent.EventType {
case model.PingEvent:
err = R.repo.PingServerUpdate(ctx, taskEvent.WorkerName, taskEvent.IP)
case model.NotificationEvent:
err = R.repo.AddNewTaskEvent(ctx, taskEvent)
default:
err = fmt.Errorf("unknown event type %s", taskEvent.EventType)
}

return err
}

func (R *RuntimeScheduler) completeJob(ctx context.Context, jobEvent *model.TaskEvent) error {
video, err := R.repo.GetJob(ctx, jobEvent.Id.String())
if err != nil {
Expand Down Expand Up @@ -419,7 +433,7 @@ func (R *RuntimeScheduler) GetChecksum(ctx context.Context, uuid string) (string
return checksum, nil
}

func (S *RuntimeScheduler) stop() {
func (R *RuntimeScheduler) stop() {

}

Expand Down

0 comments on commit 3b16725

Please sign in to comment.