Skip to content

Concurent Sub/Unsub pattern for Golang. Can be used as an alternative to channels

License

Notifications You must be signed in to change notification settings

jenchik/listener

Repository files navigation

listener

Concurent Sub/Unsub pattern for Golang. Can be used as an alternative to channels

Build Status Go Report Card

Installation

go get github.com/jenchik/listener

Examples

with waiting

package main

import (
	"fmt"
	"net/http"
	"sync"
	"time"

	"github.com/jenchik/listener"
)

func sendRequest(url string) (*http.Response, error) {
	client := http.Client{
		Timeout: time.Second,
	}

	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		return nil, err
	}

	res, err := client.Do(req)
	if err != nil {
		return nil, err
	}

	return res, nil
}

func worker(l listener.Listener, url string) {
	response, err := sendRequest(url)
	if err != nil {
		fmt.Println("Error request:", err)
		return
	}

	if response.Body == nil {
		fmt.Println("Error empty body")
		return
	}
	defer response.Body.Close()

	l.Broadcast(response.Status)

	// ...
}

func main() {
	start := time.Now()

	pool := listener.NewListeners()

	var wg sync.WaitGroup
	wg.Add(5)
	urlMask := "http://example.com/page/%d"
	for i := 0; i < 5; i++ {
		l, _ := pool.GetOrCreate(i)
		go worker(l, fmt.Sprintf(urlMask, i))
		go func(id int) {
			defer wg.Done()
			fmt.Printf("Request with ID-->%d is '%s'\n", id, l.Wait())
		}(i)
	}

	// ... other work

	done := make(chan struct{})
	go func() {
		wg.Wait()
		close(done)
	}()

	select {
	case <-done:
	case <-time.After(time.Second * 2):
		fmt.Println("Error with timeout")
	}

	fmt.Println("Duration", time.Since(start).String())
}

without waiting

package main

import (
	"fmt"
	"time"

	"github.com/jenchik/listener"
)

type (
	result struct {
		id int
		n  uint64
	}
)

func (r *result) String() string {
	return fmt.Sprintf("Worker ID-->%d; Result-->%d", r.id, r.n)
}

func worker(response, stop listener.Listener, id int) {
	sum := uint64(id * 1000)
	tic := time.NewTicker(time.Millisecond)

	for {
		select {
		case <-tic.C:
			response.Broadcast(&result{id, sum})
		default:
			if _, ok := stop.Receive(); ok {
				tic.Stop()
				return
			}
		}
		sum++
	}
}

func dispatcher(pool *listener.Listeners, count int) {
	stop, _ := pool.GetOrCreate("stop")
	for i := 0; i < count; i++ {
		l, _ := pool.GetOrCreate(i)
		go worker(l, stop, i)
	}

	for {
		if _, ok := stop.Receive(); ok {
			return
		}
		<-time.After(time.Second)
		if _, ok := stop.Receive(); ok {
			return
		}
		for i := 0; i < count; i++ {
			l, _ := pool.GetOrCreate(i)
			if v, ok := l.Receive(); ok {
				fmt.Println(v)
			}
		}
		fmt.Println("")
	}
}

func main() {
	start := time.Now()

	pool := listener.NewListeners()

	go dispatcher(pool, 10)

	time.Sleep(time.Second * 10)

	stop, _ := pool.GetOrCreate("stop")
	stop.Broadcast(false)

	fmt.Println("Duration", time.Since(start).String())
}

Benchmarks

BenchmarkThreadsResend-4             200000000             9.65 ns/op     207.20 MB/s           0 B/op           0 allocs/op
BenchmarkThreadsResendString-4       200000000             9.30 ns/op     215.14 MB/s           0 B/op           0 allocs/op
BenchmarkThreadsResendInt-4          200000000             9.14 ns/op     218.80 MB/s           0 B/op           0 allocs/op
BenchmarkThreadsOnce-4               200000000             9.96 ns/op     200.79 MB/s           0 B/op           0 allocs/op
BenchmarkThreadsOnceString-4         200000000             9.78 ns/op     204.54 MB/s           0 B/op           0 allocs/op
BenchmarkThreadsOnceInt-4            200000000             9.21 ns/op     217.20 MB/s           0 B/op           0 allocs/op

About

Concurent Sub/Unsub pattern for Golang. Can be used as an alternative to channels

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published