Skip to content

Commit

Permalink
Change: proxy gruop strategy improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreamacro committed Dec 10, 2019
1 parent bd4302e commit 2334baf
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 63 deletions.
20 changes: 20 additions & 0 deletions adapters/outboundgroup/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package outboundgroup

import (
"time"

"github.com/Dreamacro/clash/adapters/provider"
C "github.com/Dreamacro/clash/constant"
)

const (
defaultGetProxiesDuration = time.Second * 5
)

func getProvidersProxies(providers []provider.ProxyProvider) []C.Proxy {
proxies := []C.Proxy{}
for _, provider := range providers {
proxies = append(proxies, provider.Proxies()...)
}
return proxies
}
25 changes: 13 additions & 12 deletions adapters/outboundgroup/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (

"github.com/Dreamacro/clash/adapters/outbound"
"github.com/Dreamacro/clash/adapters/provider"
"github.com/Dreamacro/clash/common/singledo"
C "github.com/Dreamacro/clash/constant"
)

type Fallback struct {
*outbound.Base
single *singledo.Single
providers []provider.ProxyProvider
}

Expand Down Expand Up @@ -56,29 +58,28 @@ func (f *Fallback) MarshalJSON() ([]byte, error) {
}

func (f *Fallback) proxies() []C.Proxy {
proxies := []C.Proxy{}
for _, provider := range f.providers {
proxies = append(proxies, provider.Proxies()...)
}
return proxies
elm, _, _ := f.single.Do(func() (interface{}, error) {
return getProvidersProxies(f.providers), nil
})

return elm.([]C.Proxy)
}

func (f *Fallback) findAliveProxy() C.Proxy {
for _, provider := range f.providers {
proxies := provider.Proxies()
for _, proxy := range proxies {
if proxy.Alive() {
return proxy
}
proxies := f.proxies()
for _, proxy := range proxies {
if proxy.Alive() {
return proxy
}
}

return f.providers[0].Proxies()[0]
return f.proxies()[0]
}

func NewFallback(name string, providers []provider.ProxyProvider) *Fallback {
return &Fallback{
Base: outbound.NewBase(name, C.Fallback, false),
single: singledo.NewSingle(defaultGetProxiesDuration),
providers: providers,
}
}
13 changes: 8 additions & 5 deletions adapters/outboundgroup/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"github.com/Dreamacro/clash/adapters/outbound"
"github.com/Dreamacro/clash/adapters/provider"
"github.com/Dreamacro/clash/common/murmur3"
"github.com/Dreamacro/clash/common/singledo"
C "github.com/Dreamacro/clash/constant"

"golang.org/x/net/publicsuffix"
)

type LoadBalance struct {
*outbound.Base
single *singledo.Single
maxRetry int
providers []provider.ProxyProvider
}
Expand Down Expand Up @@ -98,11 +100,11 @@ func (lb *LoadBalance) SupportUDP() bool {
}

func (lb *LoadBalance) proxies() []C.Proxy {
proxies := []C.Proxy{}
for _, provider := range lb.providers {
proxies = append(proxies, provider.Proxies()...)
}
return proxies
elm, _, _ := lb.single.Do(func() (interface{}, error) {
return getProvidersProxies(lb.providers), nil
})

return elm.([]C.Proxy)
}

func (lb *LoadBalance) MarshalJSON() ([]byte, error) {
Expand All @@ -119,6 +121,7 @@ func (lb *LoadBalance) MarshalJSON() ([]byte, error) {
func NewLoadBalance(name string, providers []provider.ProxyProvider) *LoadBalance {
return &LoadBalance{
Base: outbound.NewBase(name, C.LoadBalance, false),
single: singledo.NewSingle(defaultGetProxiesDuration),
maxRetry: 3,
providers: providers,
}
Expand Down
13 changes: 8 additions & 5 deletions adapters/outboundgroup/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (

"github.com/Dreamacro/clash/adapters/outbound"
"github.com/Dreamacro/clash/adapters/provider"
"github.com/Dreamacro/clash/common/singledo"
C "github.com/Dreamacro/clash/constant"
)

type Selector struct {
*outbound.Base
single *singledo.Single
selected C.Proxy
providers []provider.ProxyProvider
}
Expand Down Expand Up @@ -66,17 +68,18 @@ func (s *Selector) Set(name string) error {
}

func (s *Selector) proxies() []C.Proxy {
proxies := []C.Proxy{}
for _, provider := range s.providers {
proxies = append(proxies, provider.Proxies()...)
}
return proxies
elm, _, _ := s.single.Do(func() (interface{}, error) {
return getProvidersProxies(s.providers), nil
})

return elm.([]C.Proxy)
}

func NewSelector(name string, providers []provider.ProxyProvider) *Selector {
selected := providers[0].Proxies()[0]
return &Selector{
Base: outbound.NewBase(name, C.Selector, false),
single: singledo.NewSingle(defaultGetProxiesDuration),
providers: providers,
selected: selected,
}
Expand Down
84 changes: 43 additions & 41 deletions adapters/outboundgroup/urltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,73 @@ import (
"context"
"encoding/json"
"net"
"time"

"github.com/Dreamacro/clash/adapters/outbound"
"github.com/Dreamacro/clash/adapters/provider"
"github.com/Dreamacro/clash/common/singledo"
C "github.com/Dreamacro/clash/constant"
)

type URLTest struct {
*outbound.Base
fast C.Proxy
providers []provider.ProxyProvider
single *singledo.Single
fastSingle *singledo.Single
providers []provider.ProxyProvider
}

func (u *URLTest) Now() string {
return u.fast.Name()
return u.fast().Name()
}

func (u *URLTest) DialContext(ctx context.Context, metadata *C.Metadata) (c C.Conn, err error) {
for i := 0; i < 3; i++ {
c, err = u.fast.DialContext(ctx, metadata)
if err == nil {
c.AppendToChains(u)
return
}
u.fallback()
c, err = u.fast().DialContext(ctx, metadata)
if err == nil {
c.AppendToChains(u)
}
return
return c, err
}

func (u *URLTest) DialUDP(metadata *C.Metadata) (C.PacketConn, net.Addr, error) {
pc, addr, err := u.fast.DialUDP(metadata)
pc, addr, err := u.fast().DialUDP(metadata)
if err == nil {
pc.AppendToChains(u)
}
return pc, addr, err
}

func (u *URLTest) proxies() []C.Proxy {
proxies := []C.Proxy{}
for _, provider := range u.providers {
proxies = append(proxies, provider.Proxies()...)
}
return proxies
elm, _, _ := u.single.Do(func() (interface{}, error) {
return getProvidersProxies(u.providers), nil
})

return elm.([]C.Proxy)
}

func (u *URLTest) fast() C.Proxy {
elm, _, _ := u.fastSingle.Do(func() (interface{}, error) {
proxies := u.proxies()
fast := proxies[0]
min := fast.LastDelay()
for _, proxy := range proxies[1:] {
if !proxy.Alive() {
continue
}

delay := proxy.LastDelay()
if delay < min {
fast = proxy
min = delay
}
}
return fast, nil
})

return elm.(C.Proxy)
}

func (u *URLTest) SupportUDP() bool {
return u.fast.SupportUDP()
return u.fast().SupportUDP()
}

func (u *URLTest) MarshalJSON() ([]byte, error) {
Expand All @@ -64,30 +85,11 @@ func (u *URLTest) MarshalJSON() ([]byte, error) {
})
}

func (u *URLTest) fallback() {
proxies := u.proxies()
fast := proxies[0]
min := fast.LastDelay()
for _, proxy := range proxies[1:] {
if !proxy.Alive() {
continue
}

delay := proxy.LastDelay()
if delay < min {
fast = proxy
min = delay
}
}
u.fast = fast
}

func NewURLTest(name string, providers []provider.ProxyProvider) *URLTest {
fast := providers[0].Proxies()[0]

return &URLTest{
Base: outbound.NewBase(name, C.URLTest, false),
fast: fast,
providers: providers,
Base: outbound.NewBase(name, C.URLTest, false),
single: singledo.NewSingle(defaultGetProxiesDuration),
fastSingle: singledo.NewSingle(time.Second * 10),
providers: providers,
}
}
54 changes: 54 additions & 0 deletions common/singledo/singledo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package singledo

import (
"sync"
"time"
)

type call struct {
wg sync.WaitGroup
val interface{}
err error
}

type Single struct {
mux sync.Mutex
last int64
wait int64
call *call
result *Result
}

type Result struct {
Val interface{}
Err error
}

func (s *Single) Do(fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
s.mux.Lock()
now := time.Now().Unix()
if now < s.last+s.wait {
s.mux.Unlock()
return s.result.Val, s.result.Err, true
}

if call := s.call; call != nil {
s.mux.Unlock()
call.wg.Wait()
return call.val, call.err, true
}

call := &call{}
call.wg.Add(1)
s.call = call
s.mux.Unlock()
call.val, call.err = fn()
s.call = nil
s.result = &Result{call.val, call.err}
s.last = now
return call.val, call.err, false
}

func NewSingle(wait time.Duration) *Single {
return &Single{wait: int64(wait)}
}
52 changes: 52 additions & 0 deletions common/singledo/singledo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package singledo

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestBasic(t *testing.T) {
single := NewSingle(time.Millisecond * 30)
foo := 0
shardCount := 0
call := func() (interface{}, error) {
foo++
return nil, nil
}

var wg sync.WaitGroup
const n = 10
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
_, _, shard := single.Do(call)
if shard {
shardCount++
}
wg.Done()
}()
}

wg.Wait()
assert.Equal(t, 1, foo)
assert.Equal(t, 9, shardCount)
}

func TestTimer(t *testing.T) {
single := NewSingle(time.Millisecond * 30)
foo := 0
call := func() (interface{}, error) {
foo++
return nil, nil
}

single.Do(call)
time.Sleep(10 * time.Millisecond)
_, _, shard := single.Do(call)

assert.Equal(t, 1, foo)
assert.True(t, shard)
}

0 comments on commit 2334baf

Please sign in to comment.