Skip to content

Commit

Permalink
Multidomain (#1)
Browse files Browse the repository at this point in the history
* per domain task scheduling
  • Loading branch information
Kycklingar authored Jan 27, 2020
1 parent c4fee70 commit 6d5e13b
Show file tree
Hide file tree
Showing 36 changed files with 507 additions and 265 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#pultimad

Heavy duty yiff.party archiver.
52 changes: 52 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package config

import (
"encoding/json"
"errors"
"io"
"os"
)

var (
ErrInvalid = errInvalid()
)

func errInvalid() error {
return errors.New("Invalid config type")
}

type Config interface {
Default() Config
}

func Load(filename string, out Config) error {
f, err := os.Open(filename)
if err != nil {
return err
}
defer f.Close()

return Parse(f, out)
}

func Parse(input io.Reader, c Config) error {
dec := json.NewDecoder(input)
err := dec.Decode(&c)
if err != nil {
return err
}

return nil
}

func Write(filename string, cfg Config) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer f.Close()

enc := json.NewEncoder(f)
enc.SetIndent("", " ")
return enc.Encode(cfg)
}
61 changes: 61 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package config

import (
"strings"
"testing"
)

func expected(t *testing.T, exp, got string) {
t.Fatalf("Expected %s, got %s", exp, got)
}

type cconfig struct {
Hello string
Darling string
}

func (c cconfig) Default() Config {
return c
}

func TestCustomConfig(t *testing.T) {
var cc cconfig
cc.Hello = "Worldstar"
cc.Darling = "Merry"

ss := strings.NewReader("{\"Hello\":\"World\"}")
err := Parse(ss, &cc)
if err != nil {
t.Fatal(err)
}

if cc.Hello != "World" {
expected(t, "World", cc.Hello)
}

if cc.Darling != "Merry" {
expected(t, "Merry", cc.Darling)
}
}

func TestLoad(t *testing.T) {
var cfg cconfig
err := Load("test.cfg", &cfg)
if err != nil {
t.Fatal(err)
}

if cfg.Hello != "World!" {
expected(t, "World", cfg.Hello)
}
}

func TestWrite(t *testing.T) {
var cfg cconfig
cfg.Hello = "Oxy"
cfg.Darling = "Daisy"

if err := Write("testWrite.cfg", cfg); err != nil {
t.Fatal(err)
}
}
160 changes: 19 additions & 141 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
@@ -1,164 +1,42 @@
package daemon

import (
"fmt"
"log"
"sync"
"os"
"time"

yp "github.com/kycklingar/pultimad/parser"
db "github.com/kycklingar/pultimad/storage"
)

const downloadFiles = true

func NewDaemon(connstr string) (*daemon, error) {
func NewDaemon() *daemon {
var d = new(daemon)
var err error
d.b, err = db.Connect(connstr)
if err != nil {
return nil, err
}

d.q = new(queue)
d.q.l = new(sync.Mutex)

creators, err := d.b.LoadCreators()
if err != nil {
return nil, err
}

yp.PopulateCreators(creators)

d.downloadPath = "download"

return d, nil
return d
}

type daemon struct {
q *queue
b *db.DB
downloadPath string
domains []domain
}

func (d *daemon) ReloadCreators() error {
err := yp.LoadCreators()
if err != nil {
return err
func (d *daemon) RegisterDomain(name string, checker Checker, sleepTime time.Duration) {
var dom = domain{
checker: checker,
name: name,
queue: newQueue(),
sleepTime: sleepTime,
}

for k, v := range yp.Creators2 {
var c = new(yp.Creator)
c.ID = k
c.Name = v
err = d.b.StoreCreator(c)
if err != nil {
return err
}
}

return nil
d.domains = append(d.domains, dom)
}

func (d *daemon) QueueCreator(id int) {
d.q.push(task{creatorTask, id})
}

func (d *daemon) checkForNewJobs() {
if downloadFiles {
files, err := d.b.GetFiles(5)
if err != nil {
log.Println(err)
} else {
for _, file := range files {
d.q.push(task{fileTask, file})
}
}
}

creators, err := d.b.GetCreators(5)
if err != nil {
log.Println(err)
} else {
for _, creator := range creators {
d.q.push(task{postsTask, creator})
}
func (d *daemon) Loop(quit chan os.Signal) {
for _, dom := range d.domains {
go dom.process()
}

<-quit
d.cleanup()
}

func (d *daemon) Loop() {
for {
time.Sleep(time.Second * 2)
tas := d.q.pop()
switch tas.ttype {
case sharedFilesTask:
fmt.Print("Grabing shared files from ")
creator, ok := tas.data.(*yp.Creator)
if !ok {
log.Println("task data not of type *yp.Creator", tas, tas.data)
break
}

fmt.Println(creator.Name)
files, err := creator.SharedFiles()
if err != nil {
log.Println(err)
break
}

for _, file := range files {
if err = d.b.StoreSharedFile(file); err != nil {
log.Println(err)
}
}

err = d.b.CheckCreator(creator)
if err != nil {
log.Println(err)
}
case postsTask:
fmt.Print("Grabing posts from ")
creator, ok := tas.data.(*yp.Creator)
if !ok {
log.Println("task data not of type *yp.Creator", tas, tas.data)
break
}

fmt.Println(creator.Name)

posts, err := creator.Next()
if err != nil {
log.Println(err)
break
}

if posts == nil {
d.q.push(task{sharedFilesTask, creator})
break
}

for _, post := range posts {
if err = d.b.StorePost(post); err != nil {
log.Println(err)
}
}
d.q.push(tas)
case fileTask:
fmt.Print("Downloading file ")
file, ok := tas.data.(db.File)
if !ok {
log.Println("task data not of type db.File")
break
}
fmt.Printf("[%s] %s\n", file.Creator, file.FileURL)
if err := d.download(file); err != nil {
d.b.Tried(file)
log.Println(err)
}
default:
d.checkForNewJobs()
time.Sleep(time.Second * 5)
}
func (d *daemon) cleanup() {
for _, dom := range d.domains {
dom.quit()
}
}
55 changes: 55 additions & 0 deletions daemon/domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package daemon

import (
"fmt"
"time"

"github.com/kycklingar/pultimad/config"
)

type Checker interface {
Init(config.Config) error
Check() []Taskif
Quit()
}

type domain struct {
checker Checker
name string
queue *queue

sleepTime time.Duration

q bool
}

func (dom *domain) quit() {
dom.q = true
dom.checker.Quit()
}

func (dom *domain) process() {
for !dom.q {
task := dom.queue.pop()
if task != nil {
fmt.Printf("[%s] %s\n", task.Domain(), task.Description())
newTasks := task.Do()
if newTasks != nil {
dom.queue.push(newTasks)
}
} else if !dom.check() {
time.Sleep(time.Second * 10)
}

time.Sleep(dom.sleepTime)
}
}

func (dom *domain) check() bool {
tasks := dom.checker.Check()
if tasks != nil {
dom.queue.push(tasks)
}

return len(tasks) > 0
}
33 changes: 0 additions & 33 deletions daemon/download.go

This file was deleted.

Loading

0 comments on commit 6d5e13b

Please sign in to comment.