Skip to content

Commit

Permalink
Merge pull request ClusterCockpit#126 from ClusterCockpit/hotfix
Browse files Browse the repository at this point in the history
Improve Compression Service
  • Loading branch information
moebiusband73 authored Jun 10, 2023
2 parents 05ec0a4 + 911dcb6 commit eae4f38
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 21 deletions.
10 changes: 6 additions & 4 deletions cmd/cc-backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func main() {

s.Every(1).Day().At("4:00").Do(func() {
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
jobs, err := jobRepo.FindJobsBefore(startTime)
jobs, err := jobRepo.FindJobsBetween(0, startTime)
if err != nil {
log.Warnf("Error while looking for retention jobs: %s", err.Error())
}
Expand All @@ -473,7 +473,7 @@ func main() {

s.Every(1).Day().At("4:00").Do(func() {
startTime := time.Now().Unix() - int64(cfg.Retention.Age*24*3600)
jobs, err := jobRepo.FindJobsBefore(startTime)
jobs, err := jobRepo.FindJobsBetween(0, startTime)
if err != nil {
log.Warnf("Error while looking for retention jobs: %s", err.Error())
}
Expand All @@ -497,12 +497,14 @@ func main() {
log.Info("Register compression service")

s.Every(1).Day().At("5:00").Do(func() {
ar := archive.GetHandle()
startTime := time.Now().Unix() - int64(cfg.Compression*24*3600)
jobs, err := jobRepo.FindJobsBefore(startTime)
lastTime := ar.CompressLast(startTime)
jobs, err := jobRepo.FindJobsBetween(lastTime, startTime)
if err != nil {
log.Warnf("Error while looking for retention jobs: %s", err.Error())
}
archive.GetHandle().Compress(jobs)
ar.Compress(jobs)
})
}

Expand Down
20 changes: 12 additions & 8 deletions internal/repository/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,18 +712,22 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
return nil
}

func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) {
func (r *JobRepository) FindJobsBetween(startTimeBegin int64, startTimeEnd int64) ([]*schema.Job, error) {

query := sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
"job.start_time < %d", startTime))
var query sq.SelectBuilder

sql, args, err := query.ToSql()
if err != nil {
log.Warn("Error while converting query to sql")
return nil, err
if startTimeBegin == startTimeEnd || startTimeBegin > startTimeEnd {
return nil, errors.New("startTimeBegin is equal or larger startTimeEnd")
}

if startTimeBegin == 0 {
query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
"job.start_time < %d", startTimeEnd))
} else {
query = sq.Select(jobColumns...).From("job").Where(fmt.Sprintf(
"job.start_time BETWEEN %d AND %d", startTimeBegin, startTimeEnd))
}

log.Debugf("SQL query: `%s`, args: %#v", sql, args)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
Expand Down
Binary file modified internal/repository/testdata/job.db
Binary file not shown.
Binary file added internal/repository/testdata/job.db-shm
Binary file not shown.
Empty file.
2 changes: 2 additions & 0 deletions pkg/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type ArchiveBackend interface {

Compress(jobs []*schema.Job)

CompressLast(starttime int64) int64

Iter(loadMetricData bool) <-chan JobContainer
}

Expand Down
44 changes: 35 additions & 9 deletions pkg/archive/fsBackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type FsArchive struct {
clusters []string
}

type clusterInfo struct {
numJobs int
dateFirst int64
dateLast int64
diskSize float64
}

func getDirectory(
job *schema.Job,
rootPath string,
Expand Down Expand Up @@ -124,7 +131,7 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {

b, err := os.ReadFile(filepath.Join(fsa.path, "version.txt"))
if err != nil {
fmt.Println("Err")
log.Warnf("fsBackend Init() - %v", err)
return 0, err
}

Expand Down Expand Up @@ -154,13 +161,6 @@ func (fsa *FsArchive) Init(rawConfig json.RawMessage) (uint64, error) {
return version, nil
}

type clusterInfo struct {
numJobs int
dateFirst int64
dateLast int64
diskSize float64
}

func (fsa *FsArchive) Info() {
fmt.Printf("Job archive %s\n", fsa.path)
clusters, err := os.ReadDir(fsa.path)
Expand Down Expand Up @@ -324,6 +324,7 @@ func (fsa *FsArchive) Move(jobs []*schema.Job, path string) {
}

func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
start := time.Now()
for _, job := range jobs {
dir := getDirectory(job, fsa.path)
if err := os.RemoveAll(dir); err != nil {
Expand All @@ -337,15 +338,41 @@ func (fsa *FsArchive) CleanUp(jobs []*schema.Job) {
}
}
}

log.Infof("Retention Service - Remove %d files in %s", len(jobs), time.Since(start))
}

func (fsa *FsArchive) Compress(jobs []*schema.Job) {
var cnt int
start := time.Now()

for _, job := range jobs {
fileIn := getPath(job, fsa.path, "data.json")
if !util.CheckFileExists(fileIn) && util.GetFilesize(fileIn) > 2000 {
util.CompressFile(fileIn, getPath(job, fsa.path, "data.json.gz"))
cnt++
}
}

log.Infof("Compression Service - %d files took %s", cnt, time.Since(start))
}

func (fsa *FsArchive) CompressLast(starttime int64) int64 {

filename := filepath.Join(fsa.path, "compress.txt")
b, err := os.ReadFile(filename)
if err != nil {
log.Errorf("fsBackend Compress - %v", err)
return starttime
}
last, err := strconv.ParseInt(strings.TrimSuffix(string(b), "\n"), 10, 64)
if err != nil {
log.Errorf("fsBackend Compress - %v", err)
return starttime
}

os.WriteFile(filename, []byte(fmt.Sprintf("%d", starttime)), 0644)
return last
}

func (fsa *FsArchive) LoadJobData(job *schema.Job) (schema.JobData, error) {
Expand Down Expand Up @@ -476,7 +503,6 @@ func (fsa *FsArchive) StoreJobMeta(jobMeta *schema.JobMeta) error {
}

func (fsa *FsArchive) GetClusters() []string {

return fsa.clusters
}

Expand Down

0 comments on commit eae4f38

Please sign in to comment.