diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 638aab7d..22006702 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -450,7 +450,7 @@ func main() { s.Every(1).Day().At("4:00").Do(func() { startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) - jobs, err := jobRepo.FindJobsBefore(startTime) + jobs, err := jobRepo.FindJobsBetween(0, startTime) if err != nil { log.Warnf("Error while looking for retention jobs: %s", err.Error()) } @@ -473,7 +473,7 @@ func main() { s.Every(1).Day().At("4:00").Do(func() { startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600) - jobs, err := jobRepo.FindJobsBefore(startTime) + jobs, err := jobRepo.FindJobsBetween(0, startTime) if err != nil { log.Warnf("Error while looking for retention jobs: %s", err.Error()) } @@ -497,12 +497,14 @@ func main() { log.Info("Register compression service") s.Every(1).Day().At("5:00").Do(func() { + ar := archive.GetHandle() startTime := time.Now().Unix() - int64(cfg.Compression*24*3600) - jobs, err := jobRepo.FindJobsBefore(startTime) + lastTime := ar.CompressLast(startTime) + jobs, err := jobRepo.FindJobsBetween(lastTime, startTime) if err != nil { log.Warnf("Error while looking for retention jobs: %s", err.Error()) } - archive.GetHandle().Compress(jobs) + ar.Compress(jobs) }) } diff --git a/internal/repository/job.go b/internal/repository/job.go index ece960a3..9ae7c1ec 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -712,18 +712,22 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { return nil } -func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) { +func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) { - query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf( - "job.start_time < %d", startTime)) + var query sq.SelectBuilder - sql, args, err := query.ToSql() - if err != nil { - log.Warn("Error while converting query to sql") - return nil, err + if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd { + return nil, errors.New("startTimeBegin is equal or larger startTimeEnd") + } + + if startTimeBegin == 0 { + query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf( + "job.start_time < %d", startTimeEnd)) + } else { + query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf( + "job.start_time BETWEEN %d AND %d", startTimeBegin, startTimeEnd)) } - log.Debugf("SQL query: `%s`, args: %#v", sql, args) rows, err := query.RunWith(r.stmtCache).Query() if err != nil { log.Error("Error while running query") diff --git a/internal/repository/testdata/job.db b/internal/repository/testdata/job.db index c07fb326..9b37f841 100644 Binary files a/internal/repository/testdata/job.db and b/internal/repository/testdata/job.db differ diff --git a/internal/repository/testdata/job.db-shm b/internal/repository/testdata/job.db-shm new file mode 100644 index 00000000..fe9ac284 Binary files /dev/null and b/internal/repository/testdata/job.db-shm differ diff --git a/internal/repository/testdata/job.db-wal b/internal/repository/testdata/job.db-wal new file mode 100644 index 00000000..e69de29b diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 17211c9b..beeb24d6 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -42,6 +42,8 @@ type ArchiveBackend interface { Compress(jobs []*schema.Job) + CompressLast(starttime int64) int64 + Iter(loadMetricData bool) <-chan JobContainer } diff --git a/pkg/archive/fsBackend.go b/pkg/archive/fsBackend.go index 3825821e..f0a04770 100644 --- a/pkg/archive/fsBackend.go +++ b/pkg/archive/fsBackend.go @@ -36,6 +36,13 @@ type FsArchive struct { clusters []string } +type clusterInfo struct { + numJobs int + dateFirst int64 + dateLast int64 + diskSize float64 +} + func getDirectory( job *schema.Job, rootPath string, @@ -124,7 +131,7 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt")) if err != nil { - fmt.Println("Err") + log.Warnf("fsBackend Init() - %v", err) return 0, err } @@ -154,13 +161,6 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) { return version, nil } -type clusterInfo struct { - numJobs int - dateFirst int64 - dateLast int64 - diskSize float64 -} - func (fsa *FsArchive) Info() { fmt.Printf("Job archive %s\n", fsa.path) clusters, err := os.ReadDir(fsa.path) @@ -324,6 +324,7 @@ func (fsa *FsArchive) Move(jobs []*schema.Job, path string) { } func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { + start := time.Now() for _, job := range jobs { dir := getDirectory(job, fsa.path) if err := os.RemoveAll(dir); err != nil { @@ -337,15 +338,41 @@ func (fsa *FsArchive) CleanUp(jobs []*schema.Job) { } } } + + log.Infof("Retention Service - Remove %d files in %s", len(jobs), time.Since(start)) } func (fsa *FsArchive) Compress(jobs []*schema.Job) { + var cnt int + start := time.Now() + for _, job := range jobs { fileIn := getPath(job, fsa.path, "data.json") if !util.CheckFileExists(fileIn) && util.GetFilesize(fileIn) > 2000 { util.CompressFile(fileIn, getPath(job, fsa.path, "data.json.gz")) + cnt++ } } + + log.Infof("Compression Service - %d files took %s", cnt, time.Since(start)) +} + +func (fsa *FsArchive) CompressLast(starttime int64) int64 { + + filename := filepath.Join(fsa.path, "compress.txt") + b, err := os.ReadFile(filename) + if err != nil { + log.Errorf("fsBackend Compress - %v", err) + return starttime + } + last, err := strconv.ParseInt(strings.TrimSuffix(string(b), "\n"), 10, 64) + if err != nil { + log.Errorf("fsBackend Compress - %v", err) + return starttime + } + + os.WriteFile(filename, []byte(fmt.Sprintf("%d", starttime)), 0644) + return last } func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) { @@ -476,7 +503,6 @@ func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error { } func (fsa *FsArchive) GetClusters() []string { - return fsa.clusters }